diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-07-21 18:29:41 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-07-21 18:29:41 +0300 |
commit | b839916a3502d942ce4844416a56f1e0c8856119 (patch) | |
tree | 75ff5c9c148c92e3b2691ab97d8dd8c628558fe9 | |
parent | b48cd8291c9b1dce6da647f01b5461347f3d7efb (diff) | |
download | ydb-b839916a3502d942ce4844416a56f1e0c8856119.tar.gz |
Fix infinite loop in async CA
Fix size
Little things
Little things
3 files changed, 86 insertions, 109 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 3456cf0bc9..9666cc9b20 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -157,7 +157,7 @@ private: ); outputChannel.PopStarted = true; - ProcessOutputsState.Inflight ++; + ProcessOutputsState.Inflight++; if (toSend <= 0) { if (Y_UNLIKELY(outputChannel.Stats)) { outputChannel.Stats->BlockedByCapacity++; @@ -196,7 +196,7 @@ private: ); sinkInfo.PopStarted = true; - ProcessOutputsState.Inflight ++; + ProcessOutputsState.Inflight++; sinkInfo.FreeSpaceBeforeSend = sinkFreeSpaceBeforeSend; Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkFreeSpaceBeforeSend)); } @@ -535,7 +535,7 @@ private: } sinkInfo.PopStarted = false; - ProcessOutputsState.Inflight --; + ProcessOutputsState.Inflight--; ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished; { @@ -545,7 +545,7 @@ private: } Y_VERIFY(batch.empty()); - CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier"); + CA_LOG_D("Sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier"); CA_LOG_D("Drain sink " << outputIndex << ". Free space decreased: " << (sinkInfo.FreeSpaceBeforeSend - sinkInfo.AsyncOutput->GetFreeSpace()) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 513c002e2f..7eafdbb4f6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1469,8 +1469,7 @@ protected: void PollAsyncInput(TAsyncInputInfoBase& info, ui64 inputIndex) { Y_VERIFY(!TaskRunner || info.Buffer); if (info.Finished) { - const ui64 indexForLogging = inputIndex; // Crutch for clang - CA_LOG_D("Skip polling async input[" << indexForLogging << "]: finished"); + CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished"); return; } const i64 freeSpace = AsyncIoFreeSpace(info); diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 1e944bcfc2..84a97e14d8 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -50,57 +50,52 @@ public: { } STFUNC(Handler) { + Y_UNUSED(ctx); try { switch (ev->GetTypeRewrite()) { cFunc(NActors::TEvents::TEvPoison::EventType, TLocalTaskRunnerActor::PassAway); - HFunc(TEvTaskRunnerCreate, OnDqTask); - HFunc(TEvContinueRun, OnContinueRun); - HFunc(TEvPop, OnChannelPop); - HFunc(TEvPush, OnChannelPush); - HFunc(TEvSinkPop, OnSinkPop); - HFunc(TEvLoadTaskRunnerFromState, OnLoadTaskRunnerFromState); - HFunc(TEvStatistics, OnStatisticsRequest); + hFunc(TEvTaskRunnerCreate, OnDqTask); + hFunc(TEvContinueRun, OnContinueRun); + hFunc(TEvPop, OnChannelPop); + hFunc(TEvPush, OnChannelPush); + hFunc(TEvSinkPop, OnSinkPop); + hFunc(TEvLoadTaskRunnerFromState, OnLoadTaskRunnerFromState); + hFunc(TEvStatistics, OnStatisticsRequest); default: { Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, ev->GetTypeRewrite()); } } } catch (const NKikimr::TMemoryLimitExceededException& e) { - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - GetError(e).Release(), - 0, - ev->Cookie)); + Send( + ev->Sender, + GetError(e).Release(), + 0, + ev->Cookie); } catch (...) { - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - GetError(CurrentExceptionMessage()).Release(), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + GetError(CurrentExceptionMessage()).Release(), + /*flags=*/0, + ev->Cookie); } } private: - void OnStatisticsRequest(TEvStatistics::TPtr& ev, const TActorContext& ctx) { + void OnStatisticsRequest(TEvStatistics::TPtr& ev) { TaskRunner->UpdateStats(); THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats; for (const auto sinkId : ev->Get()->SinkIds) { sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats(); } ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats)); - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - ev->Release().Release(), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + ev->Release().Release(), + /*flags=*/0, + ev->Cookie); } - void OnLoadTaskRunnerFromState(TEvLoadTaskRunnerFromState::TPtr& ev, const TActorContext& ctx) { + void OnLoadTaskRunnerFromState(TEvLoadTaskRunnerFromState::TPtr& ev) { TMaybe<TString> error = Nothing(); try { auto guard = TaskRunner->BindAllocator(); @@ -108,13 +103,11 @@ private: } catch (const std::exception& e) { error = e.what(); } - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - new TEvLoadTaskRunnerFromStateDone(std::move(error)), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + new TEvLoadTaskRunnerFromStateDone(std::move(error)), + /*flags=*/0, + ev->Cookie); } void PassAway() override { @@ -141,14 +134,12 @@ private: return true; } - void DoContinueRun(TEvContinueRun::TPtr& ev, const TActorContext& ctx) { + void DoContinueRun(TEvContinueRun::TPtr& ev) { auto guard = TaskRunner->BindAllocator(MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : ev->Get()->MemLimit); auto inputMap = ev->Get()->AskFreeSpace ? Inputs : ev->Get()->InputChannels; - auto& sourcesMap = Sources; - NYql::NDq::ERunStatus res = ERunStatus::Finished; THashMap<ui32, ui64> inputChannelFreeSpace; THashMap<ui32, ui64> sourcesFreeSpace; @@ -161,7 +152,7 @@ private: inputChannelFreeSpace[channelId] = TaskRunner->GetInputChannel(channelId)->GetFreeSpace(); } - for (auto& index : sourcesMap) { + for (auto& index : Sources) { sourcesFreeSpace[index] = TaskRunner->GetSource(index)->GetFreeSpace(); } } @@ -170,7 +161,6 @@ private: if ((res == ERunStatus::PendingInput || res == ERunStatus::Finished) && ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) { mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>(); try { - auto guard = TaskRunner->BindAllocator(); mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData(); data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); @@ -193,28 +183,26 @@ private: MemoryQuota->TryShrinkMemory(guard.GetMutex()); } - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - new TEvTaskRunFinished( - res, - std::move(inputChannelFreeSpace), - std::move(sourcesFreeSpace), - {}, - MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(), - MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0, - std::move(mkqlProgramState), - ev->Get()->CheckpointRequest.Defined()), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + new TEvTaskRunFinished( + res, + std::move(inputChannelFreeSpace), + std::move(sourcesFreeSpace), + {}, + MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(), + MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0, + std::move(mkqlProgramState), + ev->Get()->CheckpointRequest.Defined()), + /*flags=*/0, + ev->Cookie); } - void OnContinueRun(TEvContinueRun::TPtr& ev, const TActorContext& ctx) { - DoContinueRun(ev, ctx); + void OnContinueRun(TEvContinueRun::TPtr& ev) { + DoContinueRun(ev); } - void OnChannelPush(TEvPush::TPtr& ev, const NActors::TActorContext& ctx) { + void OnChannelPush(TEvPush::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); auto hasData = ev->Get()->HasData; auto finish = ev->Get()->Finish; @@ -241,13 +229,11 @@ private: } // run - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - new TEvContinueRun(channelId, freeSpace), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + new TEvContinueRun(channelId, freeSpace), + /*flags=*/0, + ev->Cookie); } void AsyncInputPush( @@ -257,22 +243,19 @@ private: i64 space, bool finish) override { - auto& ctx = NActors::TlsActivationContext; auto source = TaskRunner->GetSource(index); source->Push(std::move(batch), space); if (finish) { source->Finish(); } - ctx->Send( - new IEventHandle( - ParentId, - SelfId(), - new TEvAsyncInputPushFinished(index), - /*flags=*/0, - cookie)); + Send( + ParentId, + new TEvAsyncInputPushFinished(index), + /*flags=*/0, + cookie); } - void OnChannelPop(TEvPop::TPtr& ev, const NActors::TActorContext& ctx) { + void OnChannelPop(TEvPop::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); auto channelId = ev->Get()->ChannelId; @@ -313,20 +296,18 @@ private: } } - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - new TEvChannelPopFinished( - channelId, - std::move(chunks), - std::move(checkpoint), - isFinished, - changed, - {}, - TDqTaskRunnerStatsView(TaskRunner->GetStats())), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + new TEvChannelPopFinished( + channelId, + std::move(chunks), + std::move(checkpoint), + isFinished, + changed, + {}, + TDqTaskRunnerStatsView(TaskRunner->GetStats())), + /*flags=*/0, + ev->Cookie); } void ResumeInputs() { @@ -335,8 +316,7 @@ private: } } - void OnSinkPop(TEvSinkPop::TPtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ctx); + void OnSinkPop(TEvSinkPop::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); auto sink = TaskRunner->GetSink(ev->Get()->Index); @@ -349,19 +329,19 @@ private: if (ev->Get()->Size > 0) { size = sink->Pop(batch, ev->Get()->Size); } - bool hasCheckpoint = sink->Pop(checkpoint); + const bool hasCheckpoint = sink->Pop(checkpoint); if (hasCheckpoint) { checkpointSize = checkpoint.ByteSize(); maybeCheckpoint.ConstructInPlace(std::move(checkpoint)); ResumeInputs(); } - auto finished = sink->IsFinished(); - bool changed = finished || ev->Get()->Size > 0 || hasCheckpoint; + const bool finished = sink->IsFinished(); + const bool changed = finished || size > 0 || hasCheckpoint; Parent->SinkSend(ev->Get()->Index, std::move(batch), std::move(maybeCheckpoint), checkpointSize, size, finished, changed); } - void OnDqTask(TEvTaskRunnerCreate::TPtr& ev, const NActors::TActorContext& ctx) { + void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) { ParentId = ev->Sender; TaskRunner = Factory(ev->Get()->Task, [this](const TString& message) { LOG_D(message); @@ -392,13 +372,11 @@ private: TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory()); - ctx.Send( - new IEventHandle( - ev->Sender, - SelfId(), - event.Release(), - /*flags=*/0, - ev->Cookie)); + Send( + ev->Sender, + event.Release(), + /*flags=*/0, + ev->Cookie); } THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) { |