diff options
author | yumkam <yumkam7@ydb.tech> | 2025-04-15 19:59:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-15 19:59:36 +0300 |
commit | 048e65e557439b62712cba204836b98bbb3933cc (patch) | |
tree | e5a3a08e3892c71ce3487aaa8360e0470695d892 | |
parent | 476779e1e222b46d41445045f2c60f81a25d7a5c (diff) | |
download | ydb-048e65e557439b62712cba204836b98bbb3933cc.tar.gz |
pq rd: fix use-after-free with logbroker federation (v2) (#17189)
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 80 |
1 files changed, 68 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index fd00b1df907..ca03490ac75 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -234,6 +234,7 @@ private: std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher) std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker; + // Set on Parent ui64 CpuMicrosec = 0; // Set on both Parent (cumulative) and Childern (separate) @@ -377,10 +378,47 @@ public: hFunc(TEvPrivate::TEvRefreshClusters, Handle); hFunc(TEvPrivate::TEvReceivedClusters, Handle); hFunc(TEvPrivate::TEvDescribeTopicResult, Handle); + }) - cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + STRICT_STFUNC(IgnoreState, { + // ignore all events except for retry queue + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, IgnoreEvent); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorResult, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvMessageBatch, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvStartSessionAck, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession); + + hFunc(NActors::TEvents::TEvPong, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); + hFunc(NActors::TEvents::TEvUndelivered, Handle); + hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle); + hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); + + // ignore all row dispatcher events + hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, ReplyNoSession); + hFunc(TEvPrivate::TEvPrintState, IgnoreEvent); + hFunc(TEvPrivate::TEvProcessState, IgnoreEvent); + hFunc(TEvPrivate::TEvNotifyCA, IgnoreEvent); + hFunc(TEvPrivate::TEvRefreshClusters, IgnoreEvent); + hFunc(TEvPrivate::TEvReceivedClusters, IgnoreEvent); + hFunc(TEvPrivate::TEvDescribeTopicResult, IgnoreEvent); }) + template <class TEventPtr> + void IgnoreEvent(TEventPtr& ev) { + SRC_LOG_D("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender); + } + + template <class TEventPtr> + void ReplyNoSession(TEventPtr& ev) { + SRC_LOG_D("Ignore (no session) " << typeid(TEventPtr).name() << " from " << ev->Sender); + SendNoSession(ev->Sender, ev->Cookie); + } + static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR"; void CommitState(const NDqProto::TCheckpoint& checkpoint) override; @@ -485,9 +523,13 @@ TDqPqRdReadActor::TDqPqRdReadActor( , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , MaxBufferSize(bufferSize) { - if (Parent == this) { - State = EState::START_CLUSTER_DISCOVERY; + + SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()) + << ", partitions: " << JoinSeq(',', GetPartitionsToRead())); + if (Parent != this) { + return; } + State = EState::START_CLUSTER_DISCOVERY; const auto programBuilder = std::make_unique<TProgramBuilder>(typeEnv, *holderFactory.GetFunctionRegistry()); // Parse output schema (expected struct output type) @@ -510,8 +552,6 @@ TDqPqRdReadActor::TDqPqRdReadActor( DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(InputDataType); IngressStats.Level = statsLevel; - SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()) - << ", partitions: " << JoinSeq(',', GetPartitionsToRead())); } void TDqPqRdReadActor::Init() { @@ -663,15 +703,18 @@ void TDqPqRdReadActor::StopSession(TSession& sessionInfo) { // IActor & IDqComputeActorAsyncInput void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor SRC_LOG_I("PassAway"); + Become(&TDqPqRdReadActor::IgnoreState); PrintInternalState(); for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { StopSession(sessionInfo); } for (auto& clusterState : Clusters) { - if (clusterState.Child == this) { + auto child = clusterState.Child; + if (child == this) { continue; } - Send(clusterState.ChildId, new NActors::TEvents::TEvPoison); + // all actors are on same mailbox, safe to call + child->PassAway(); } Clusters.clear(); FederatedTopicClient.Reset(); @@ -681,6 +724,7 @@ void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor } i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& /*watermark*/, bool&, i64 freeSpace) { + Counters.GetAsyncInputData++; SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); Init(); Metrics.InFlyAsyncInputData->Set(0); @@ -717,6 +761,7 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b continue; } for (auto& [rowDispatcherActorId, sessionInfo] : child->Sessions) { + // all actors are on same mailbox, safe to call child->TrySendGetNextBatch(sessionInfo); } } @@ -769,6 +814,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) { SRC_LOG_T("Received TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie); Counters.Statistics++; CpuMicrosec += ev->Get()->Record.GetCpuMicrosec(); + // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly if (Parent != this) { Parent->CpuMicrosec += ev->Get()->Record.GetCpuMicrosec(); } @@ -872,7 +918,7 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) { SRC_LOG_D("TEvCoordinatorChanged, new coordinator " << ev->Get()->CoordinatorActorId); - Counters.GetAsyncInputData++; + Counters.CoordinatorChanged++; if (CoordinatorActorId && CoordinatorActorId == ev->Get()->CoordinatorActorId) { @@ -900,6 +946,7 @@ void TDqPqRdReadActor::ScheduleProcessState() { void TDqPqRdReadActor::ReInit(const TString& reason) { SRC_LOG_I("ReInit state, reason " << reason); + // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly Parent->Metrics.ReInit->Inc(); State = EState::WAIT_COORDINATOR_ID; @@ -915,7 +962,7 @@ void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issu void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) { SRC_LOG_I("Received TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie); - Counters.CoordinatorChanged++; + Counters.CoordinatorResult++; if (ev->Cookie != CoordinatorRequestCookie) { SRC_LOG_W("Ignore TEvCoordinatorResult. wrong cookie"); return; @@ -991,6 +1038,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << partitionId)}); return; } + // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly Parent->Metrics.InFlyGetNextBatch->Set(0); if (ev->Get()->Record.GetMessages().empty()) { return; @@ -1068,14 +1116,21 @@ void TDqPqRdReadActor::PrintInternalState() { TString TDqPqRdReadActor::GetInternalState() { TStringStream str; - str << LogPrefix << "State: used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData << "\n"; - str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult + str << LogPrefix << "State:"; + str << " used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer.size() + << " state " << static_cast<ui64>(State) + << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData + << "\n"; + str << "Counters:" + << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult << " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived << " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected << " NodeConnected " << Counters.NodeConnected << " Undelivered " << Counters.Undelivered << " Retry " << Counters.Retry << " PrivateHeartbeat " << Counters.PrivateHeartbeat << " SessionClosed " << Counters.SessionClosed << " Pong " << Counters.Pong << " Heartbeat " << Counters.Heartbeat << " PrintState " << Counters.PrintState << " ProcessState " << Counters.ProcessState - << " NotifyCA " << Parent->Counters.NotifyCA << "\n"; + << " GetAsyncInputData " << Parent->Counters.GetAsyncInputData + << " NotifyCA " << Parent->Counters.NotifyCA + << "\n"; for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { str << " " << rowDispatcherActorId << " status " << static_cast<ui64>(sessionInfo.Status) @@ -1365,6 +1420,7 @@ void TDqPqRdReadActor::StartCluster(ui32 clusterIndex) { PqGateway, this, TString(Clusters[clusterIndex].Info.Name)); + Clusters[clusterIndex].Child = actor; Clusters[clusterIndex].ChildId = RegisterWithSameMailbox(actor); actor->Init(); actor->InitChild(); |