summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <[email protected]>2023-09-06 18:00:47 +0300
committertesseract <[email protected]>2023-09-06 18:19:54 +0300
commitf37a025ab3c1e0a083536193434e0442181d854a (patch)
tree6553671ab3916142875f6d3471a32aeef24e3ff0
parent990d611aed1384d97f76e99eb2a91159635a48ca (diff)
Fix SchemaActor VERIFY
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp193
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.h12
2 files changed, 115 insertions, 90 deletions
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 99a2e3b0ee6..b45283baeec 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -504,7 +504,7 @@ bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActor
HFuncCtx(NKikimr::TEvPersQueue::TEvStatusResponse, Handle, ctx);
HFuncCtx(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle, ctx);
HFuncCtx(TEvPersQueue::TEvGetPartitionsLocationResponse, Handle, ctx);
- HFuncCtx(TEvents::TEvWakeup, HandleWakeup, ctx);
+ HFuncCtx(TEvPQProxy::TEvRequestTablet, Handle, ctx);
default: return false;
}
return true;
@@ -550,33 +550,53 @@ void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQue
void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) {
auto it = Tablets.find(tabletId);
if (it == Tablets.end()) return;
- if (pipe && pipe != it->second.Pipe) return;
- if (--it->second.RetriesLeft == 0) {
- return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx);
- }
- it->second.Pipe = TActorId{};
+
+ auto& tabletInfo = it->second;
+ if (pipe && pipe != tabletInfo.Pipe) return;
+ if (tabletInfo.ResultRecived) return;
+
if (tabletId == BalancerTabletId) {
+ if (GotLocation && GotReadSessions) {
+ return;
+ }
BalancerPipe = nullptr;
}
- Y_VERIFY(RequestsInfly > 0);
- --RequestsInfly;
- if (delay == TDuration::Zero()) {
- RequestTablet(it->second, ctx);
- } else {
- ++RequestsInfly;
- ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId));
+
+ NTabletPipe::CloseClient(ctx, tabletInfo.Pipe);
+ tabletInfo.Pipe = TActorId{};
+
+ if (--it->second.RetriesLeft == 0) {
+ return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx);
}
+
+ ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId));
}
void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx) {
- --RequestsInfly;
auto it = Tablets.find(ev->Get()->TabletId);
if (it == Tablets.end()) return;
- if (ev->Get()->TabletId == BalancerTabletId && PendingLocation) {
- PendingLocation = false;
+ auto& tabletInfo = it->second;
+
+ if (ev->Get()->TabletId == BalancerTabletId) {
+ if (GotLocation && GotReadSessions) {
+ return;
+ }
+ if (!GotLocation) {
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
+ }
+ if (!GotReadSessions) {
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
+ }
+ } else if (tabletInfo.ResultRecived) {
+ return;
+ } else {
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
}
- RequestTablet(it->second, ctx);
+ RequestTablet(tabletInfo, ctx);
}
void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) {
@@ -598,36 +618,43 @@ void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorCon
if (tablet.TabletId == BalancerTabletId) {
BalancerPipe = &tablet.Pipe;
- return RequestBalancer(ctx);
+ RequestBalancer(ctx);
} else {
- THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(
- Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx),
- Settings.Consumer.empty()
- ));
- NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
+ RequestPartitionStatus(tablet, ctx);
}
- ++RequestsInfly;
}
void TDescribeTopicActorImpl::RequestBalancer(const TActorContext& ctx) {
Y_VERIFY(BalancerTabletId);
- if (Settings.RequireLocation && !PendingLocation && !GotLocation) {
- return RequestPartitionsLocationIfRequired(ctx);
+ if (Settings.RequireLocation) {
+ if (!GotLocation) {
+ RequestPartitionsLocation(ctx);
+ }
+ } else {
+ GotLocation = true;
}
+
if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer && Settings.RequireStats) {
- NTabletPipe::SendData(
- ctx, *BalancerPipe,
- new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx))
- );
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions");
- ++RequestsInfly;
+ if (!GotReadSessions) {
+ RequestReadSessionsInfo(ctx);
+ }
+ } else {
+ GotReadSessions = true;
}
}
-void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorContext& ctx) {
- if (!Settings.RequireLocation || PendingLocation)
- return;
+void TDescribeTopicActorImpl::RequestPartitionStatus(const TTabletInfo& tablet, const TActorContext& ctx) {
+ THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(
+ Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx),
+ Settings.Consumer.empty()
+ ));
+ NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
+ ++RequestsInfly;
+}
+
+void TDescribeTopicActorImpl::RequestPartitionsLocation(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request location");
+
THashSet<ui64> partIds;
TVector<ui64> partsVector;
for (auto p : Settings.Partitions) {
@@ -646,16 +673,25 @@ void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorCo
ctx, *BalancerPipe,
new TEvPersQueue::TEvGetPartitionsLocation(partsVector)
);
- PendingLocation = true;
- GotLocation = false;
+ ++RequestsInfly;
+}
+
+void TDescribeTopicActorImpl::RequestReadSessionsInfo(const TActorContext& ctx) {
+ NTabletPipe::SendData(
+ ctx, *BalancerPipe,
+ new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx))
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions");
+ ++RequestsInfly;
}
void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
auto it = Tablets.find(ev->Get()->Record.GetTabletId());
if (it == Tablets.end()) return;
- --RequestsInfly;
- NTabletPipe::CloseClient(ctx, it->second.Pipe);
- it->second.Pipe = TActorId{};
+
+ auto& tabletInfo = it->second;
+
+ if (tabletInfo.ResultRecived) return;
auto& record = ev->Get()->Record;
for (auto& partResult : record.GetPartResult()) {
@@ -666,13 +702,17 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T
}
}
- ApplyResponse(it->second, ev, ctx);
+ tabletInfo.ResultRecived = true;
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
+
+ NTabletPipe::CloseClient(ctx, tabletInfo.Pipe);
+ tabletInfo.Pipe = TActorId{};
+
+ ApplyResponse(tabletInfo, ev, ctx);
if (RequestsInfly == 0) {
- RequestAdditionalInfo(ctx);
- if (RequestsInfly == 0 && !PendingLocation) {
- Reply(ctx);
- }
+ Reply(ctx);
}
}
@@ -680,65 +720,57 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T
void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got sessions");
- if (BalancerTabletId == 0)
+ if (GotReadSessions)
return;
+
auto it = Tablets.find(BalancerTabletId);
Y_VERIFY(it != Tablets.end());
+
+ GotReadSessions = true;
+ Y_VERIFY(RequestsInfly > 0);
--RequestsInfly;
- NTabletPipe::CloseClient(ctx, *BalancerPipe);
- *BalancerPipe = TActorId{};
- BalancerTabletId = 0;
+ CheckCloseBalancerPipe(ctx);
ApplyResponse(it->second, ev, ctx);
if (RequestsInfly == 0) {
- RequestAdditionalInfo(ctx);
- if (RequestsInfly == 0 && !PendingLocation) {
- Reply(ctx);
- }
+ Reply(ctx);
}
}
void TDescribeTopicActorImpl::Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got location");
- if (BalancerTabletId == 0)
+
+ if (GotLocation)
return;
+
auto it = Tablets.find(BalancerTabletId);
Y_VERIFY(it != Tablets.end());
- PendingLocation = false;
+
const auto& record = ev->Get()->Record;
if (record.GetStatus()) {
auto res = ApplyResponse(ev, ctx);
if (res) {
GotLocation = true;
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
+
CheckCloseBalancerPipe(ctx);
- if (!RequestsInfly && !PendingLocation) {
+
+ if (RequestsInfly == 0) {
Reply(ctx);
}
return;
}
}
+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Something wrong on location, retry. Response: " << record.DebugString());
//Something gone wrong, retry
- ctx.Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup());
-}
-
-void TDescribeTopicActorImpl::HandleWakeup(TEvents::TEvWakeup::TPtr&, const NActors::TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Wakeup");
- RequestPartitionsLocationIfRequired(ctx);
- if (!RequestsInfly && !PendingLocation) {
- return Reply(ctx);
- }
-}
-
-void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) {
- if (BalancerTabletId) {
- RequestTablet(BalancerTabletId, ctx);
- }
+ ctx.Schedule(TDuration::MilliSeconds(200), new TEvPQProxy::TEvRequestTablet(BalancerTabletId));
}
void TDescribeTopicActorImpl::CheckCloseBalancerPipe(const TActorContext& ctx) {
- if (RequestsInfly || PendingLocation)
+ if (!GotLocation || !GotReadSessions)
return;
NTabletPipe::CloseClient(ctx, *BalancerPipe);
*BalancerPipe = TActorId{};
@@ -1204,11 +1236,6 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::
bool TDescribeTopicActorImpl::ProcessTablets(
const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx
) {
- auto addBalancer = [&] {
- BalancerTabletId = pqDescr.GetBalancerTabletID();
- Tablets[BalancerTabletId].TabletId = BalancerTabletId;
- };
-
auto partitionFilter = [&] (ui32 partId) {
if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribePartitions) {
return Settings.RequireStats && partId == Settings.Partitions[0];
@@ -1219,6 +1246,9 @@ bool TDescribeTopicActorImpl::ProcessTablets(
};
TotalPartitions = pqDescr.GetTotalGroupCount();
+ BalancerTabletId = pqDescr.GetBalancerTabletID();
+ Tablets[BalancerTabletId].TabletId = BalancerTabletId;
+
for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) {
const auto& pi = pqDescr.GetPartitions(i);
if (!partitionFilter(pi.GetPartitionId())) {
@@ -1227,17 +1257,12 @@ bool TDescribeTopicActorImpl::ProcessTablets(
Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId());
Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId();
}
+
for (auto& pair : Tablets) {
RequestTablet(pair.second, ctx);
}
- if (Settings.RequireLocation) {
- addBalancer();
- RequestAdditionalInfo(ctx);
- } else if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer) {
- addBalancer();
- }
- if (RequestsInfly == 0 && !PendingLocation) {
+ if (RequestsInfly == 0) {
Reply(ctx);
return false;
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h
index 1f492755df0..2ddb53c59e2 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.h
+++ b/ydb/services/persqueue_v1/actors/schema_actors.h
@@ -119,6 +119,7 @@ protected:
TActorId Pipe;
ui32 NodeId = 0;
ui32 RetriesLeft = 3;
+ bool ResultRecived = false;
TTabletInfo() = default;
TTabletInfo(ui64 tabletId)
@@ -139,16 +140,15 @@ public:
void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx);
- void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
-
bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx);
void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx);
void RequestTablet(ui64 tabletId, const TActorContext& ctx);
void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero());
- void RequestAdditionalInfo(const TActorContext& ctx);
void RequestBalancer(const TActorContext& ctx);
- void RequestPartitionsLocationIfRequired(const TActorContext& ctx);
+ void RequestPartitionStatus(const TTabletInfo& tablet, const TActorContext& ctx);
+ void RequestPartitionsLocation(const TActorContext& ctx);
+ void RequestReadSessionsInfo(const TActorContext& ctx);
void CheckCloseBalancerPipe(const TActorContext& ctx);
bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
@@ -166,11 +166,11 @@ public:
virtual void Reply(const TActorContext& ctx) = 0;
private:
-
std::map<ui64, TTabletInfo> Tablets;
ui32 RequestsInfly = 0;
- bool PendingLocation = false;
+
bool GotLocation = false;
+ bool GotReadSessions = false;
TActorId* BalancerPipe = nullptr;