aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2025-04-15 19:59:36 +0300
committerGitHub <noreply@github.com>2025-04-15 19:59:36 +0300
commit048e65e557439b62712cba204836b98bbb3933cc (patch)
treee5a3a08e3892c71ce3487aaa8360e0470695d892
parent476779e1e222b46d41445045f2c60f81a25d7a5c (diff)
downloadydb-048e65e557439b62712cba204836b98bbb3933cc.tar.gz
pq rd: fix use-after-free with logbroker federation (v2) (#17189)
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp80
1 files changed, 68 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index fd00b1df907..ca03490ac75 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -234,6 +234,7 @@ private:
std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher)
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker;
+ // Set on Parent
ui64 CpuMicrosec = 0;
// Set on both Parent (cumulative) and Childern (separate)
@@ -377,10 +378,47 @@ public:
hFunc(TEvPrivate::TEvRefreshClusters, Handle);
hFunc(TEvPrivate::TEvReceivedClusters, Handle);
hFunc(TEvPrivate::TEvDescribeTopicResult, Handle);
+ })
- cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ STRICT_STFUNC(IgnoreState, {
+ // ignore all events except for retry queue
+ hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, IgnoreEvent);
+ hFunc(NFq::TEvRowDispatcher::TEvCoordinatorResult, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvMessageBatch, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvStartSessionAck, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession);
+ hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession);
+
+ hFunc(NActors::TEvents::TEvPong, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
+ hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
+
+ // ignore all row dispatcher events
+ hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, ReplyNoSession);
+ hFunc(TEvPrivate::TEvPrintState, IgnoreEvent);
+ hFunc(TEvPrivate::TEvProcessState, IgnoreEvent);
+ hFunc(TEvPrivate::TEvNotifyCA, IgnoreEvent);
+ hFunc(TEvPrivate::TEvRefreshClusters, IgnoreEvent);
+ hFunc(TEvPrivate::TEvReceivedClusters, IgnoreEvent);
+ hFunc(TEvPrivate::TEvDescribeTopicResult, IgnoreEvent);
})
+ template <class TEventPtr>
+ void IgnoreEvent(TEventPtr& ev) {
+ SRC_LOG_D("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender);
+ }
+
+ template <class TEventPtr>
+ void ReplyNoSession(TEventPtr& ev) {
+ SRC_LOG_D("Ignore (no session) " << typeid(TEventPtr).name() << " from " << ev->Sender);
+ SendNoSession(ev->Sender, ev->Cookie);
+ }
+
static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR";
void CommitState(const NDqProto::TCheckpoint& checkpoint) override;
@@ -485,9 +523,13 @@ TDqPqRdReadActor::TDqPqRdReadActor(
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
, MaxBufferSize(bufferSize)
{
- if (Parent == this) {
- State = EState::START_CLUSTER_DISCOVERY;
+
+ SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields())
+ << ", partitions: " << JoinSeq(',', GetPartitionsToRead()));
+ if (Parent != this) {
+ return;
}
+ State = EState::START_CLUSTER_DISCOVERY;
const auto programBuilder = std::make_unique<TProgramBuilder>(typeEnv, *holderFactory.GetFunctionRegistry());
// Parse output schema (expected struct output type)
@@ -510,8 +552,6 @@ TDqPqRdReadActor::TDqPqRdReadActor(
DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(InputDataType);
IngressStats.Level = statsLevel;
- SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields())
- << ", partitions: " << JoinSeq(',', GetPartitionsToRead()));
}
void TDqPqRdReadActor::Init() {
@@ -663,15 +703,18 @@ void TDqPqRdReadActor::StopSession(TSession& sessionInfo) {
// IActor & IDqComputeActorAsyncInput
void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor
SRC_LOG_I("PassAway");
+ Become(&TDqPqRdReadActor::IgnoreState);
PrintInternalState();
for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
StopSession(sessionInfo);
}
for (auto& clusterState : Clusters) {
- if (clusterState.Child == this) {
+ auto child = clusterState.Child;
+ if (child == this) {
continue;
}
- Send(clusterState.ChildId, new NActors::TEvents::TEvPoison);
+ // all actors are on same mailbox, safe to call
+ child->PassAway();
}
Clusters.clear();
FederatedTopicClient.Reset();
@@ -681,6 +724,7 @@ void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor
}
i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& /*watermark*/, bool&, i64 freeSpace) {
+ Counters.GetAsyncInputData++;
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);
Init();
Metrics.InFlyAsyncInputData->Set(0);
@@ -717,6 +761,7 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
continue;
}
for (auto& [rowDispatcherActorId, sessionInfo] : child->Sessions) {
+ // all actors are on same mailbox, safe to call
child->TrySendGetNextBatch(sessionInfo);
}
}
@@ -769,6 +814,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
SRC_LOG_T("Received TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie);
Counters.Statistics++;
CpuMicrosec += ev->Get()->Record.GetCpuMicrosec();
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
if (Parent != this) {
Parent->CpuMicrosec += ev->Get()->Record.GetCpuMicrosec();
}
@@ -872,7 +918,7 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
SRC_LOG_D("TEvCoordinatorChanged, new coordinator " << ev->Get()->CoordinatorActorId);
- Counters.GetAsyncInputData++;
+ Counters.CoordinatorChanged++;
if (CoordinatorActorId
&& CoordinatorActorId == ev->Get()->CoordinatorActorId) {
@@ -900,6 +946,7 @@ void TDqPqRdReadActor::ScheduleProcessState() {
void TDqPqRdReadActor::ReInit(const TString& reason) {
SRC_LOG_I("ReInit state, reason " << reason);
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
Parent->Metrics.ReInit->Inc();
State = EState::WAIT_COORDINATOR_ID;
@@ -915,7 +962,7 @@ void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issu
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) {
SRC_LOG_I("Received TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie);
- Counters.CoordinatorChanged++;
+ Counters.CoordinatorResult++;
if (ev->Cookie != CoordinatorRequestCookie) {
SRC_LOG_W("Ignore TEvCoordinatorResult. wrong cookie");
return;
@@ -991,6 +1038,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << partitionId)});
return;
}
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
Parent->Metrics.InFlyGetNextBatch->Set(0);
if (ev->Get()->Record.GetMessages().empty()) {
return;
@@ -1068,14 +1116,21 @@ void TDqPqRdReadActor::PrintInternalState() {
TString TDqPqRdReadActor::GetInternalState() {
TStringStream str;
- str << LogPrefix << "State: used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData << "\n";
- str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
+ str << LogPrefix << "State:";
+ str << " used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer.size()
+ << " state " << static_cast<ui64>(State)
+ << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData
+ << "\n";
+ str << "Counters:"
+ << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
<< " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived
<< " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected
<< " NodeConnected " << Counters.NodeConnected << " Undelivered " << Counters.Undelivered << " Retry " << Counters.Retry
<< " PrivateHeartbeat " << Counters.PrivateHeartbeat << " SessionClosed " << Counters.SessionClosed << " Pong " << Counters.Pong
<< " Heartbeat " << Counters.Heartbeat << " PrintState " << Counters.PrintState << " ProcessState " << Counters.ProcessState
- << " NotifyCA " << Parent->Counters.NotifyCA << "\n";
+ << " GetAsyncInputData " << Parent->Counters.GetAsyncInputData
+ << " NotifyCA " << Parent->Counters.NotifyCA
+ << "\n";
for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
str << " " << rowDispatcherActorId << " status " << static_cast<ui64>(sessionInfo.Status)
@@ -1365,6 +1420,7 @@ void TDqPqRdReadActor::StartCluster(ui32 clusterIndex) {
PqGateway,
this,
TString(Clusters[clusterIndex].Info.Name));
+ Clusters[clusterIndex].Child = actor;
Clusters[clusterIndex].ChildId = RegisterWithSameMailbox(actor);
actor->Init();
actor->InitChild();