diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-24 14:27:12 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-24 14:27:12 +0300 |
commit | 05589f01efdc402ba5c36ec05da309b9d786e723 (patch) | |
tree | 9ce9a74ffd50f7fff5e919d37e6c1e486abe69a4 | |
parent | f472affccc4bcf066a6b6adbaf896c74050ac6ea (diff) | |
download | ydb-05589f01efdc402ba5c36ec05da309b9d786e723.tar.gz |
Split EvContinueRun with EvPushFinished
5 files changed, 61 insertions, 75 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 738d17a02e..aeaa4c4650 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -111,7 +111,7 @@ private: hFunc(NTaskRunnerActor::TEvAsyncInputPushFinished, OnAsyncInputPushFinished); hFunc(NTaskRunnerActor::TEvChannelPopFinished, OnPopFinished); hFunc(NTaskRunnerActor::TEvTaskRunnerCreateFinished, OnTaskRunnerCreated); - hFunc(NTaskRunnerActor::TEvContinueRun, OnContinueRun); // push finished + hFunc(NTaskRunnerActor::TEvPushFinished, OnPushFinished); hFunc(TEvDqCompute::TEvStateRequest, OnStateRequest); hFunc(NTaskRunnerActor::TEvStatistics, OnStatisticsResponse); hFunc(NTaskRunnerActor::TEvLoadTaskRunnerFromStateDone, OnTaskRunnerLoaded); @@ -332,12 +332,10 @@ private: channelData.GetChannelId(), std::move(*channelData.MutableData()), finished, - /* askFreeSpace = */ true, /* pauseAfterPush = */ channelData.HasCheckpoint()) : MakeHolder<NTaskRunnerActor::TEvPush>( channelData.GetChannelId(), finished, - /* askFreeSpace = */ true, /* pauseAfterPush = */ channelData.HasCheckpoint()); Send(TaskRunnerActorId, ev.Release(), 0, Cookie); @@ -674,7 +672,7 @@ private: return result; } - void OnContinueRun(NTaskRunnerActor::TEvContinueRun::TPtr& ev) { + void OnPushFinished(NTaskRunnerActor::TEvPushFinished::TPtr& ev) { auto it = TakeInputChannelDataRequests.find(ev->Cookie); YQL_ENSURE(it != TakeInputChannelDataRequests.end()); @@ -693,9 +691,7 @@ private: TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId); Y_VERIFY(inputChannel); - if (ev->Get()->FreeSpace) { - inputChannel->FreeSpace = *ev->Get()->FreeSpace; - } + inputChannel->FreeSpace = ev->Get()->FreeSpace; if (it->second.Ack) { Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace); diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index ab82376997..5d4ca7c182 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -23,29 +23,30 @@ namespace NTaskRunnerActor { struct TTaskRunnerEvents { enum { - ES_CREATE = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 20000, - ES_CREATE_FINISHED, + EvCreate = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 20000, + EvCreateFinished, // channel->Pop -> TEvChannelPopFinished - ES_POP, - ES_POP_FINISHED, - // channel->Push -> TEvContinueRun - ES_PUSH, - ES_CONTINUE_RUN, - // ES_CONTINUE_RUN -> TaskRunner->Run() -> TEvTaskRunFinished - ES_RUN_FINISHED, + EvPop, + EvPopFinished, + // channel->Push -> TEvPushFinished + EvPush, + EvPushFinished, + EvContinueRun, + // EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished + EvRunFinished, - ES_ASYNC_INPUT_PUSH, - ES_ASYNC_INPUT_PUSH_FINISHED, + EvAsyncInputPush, + EvAsyncInputPushFinished, - ES_SINK_POP, - ES_SINK_POP_FINISHED, + EvSinkPop, + EvSinkPopFinished, - ES_LOAD_TASK_RUNNER_FROM_STATE, - ES_LOAD_TASK_RUNNER_FROM_STATE_DONE, + EvLoadTaskRunnerFromState, + EvLoadTaskRunnerFromStateDone, - ES_STATISTICS, + EvStatistics, - ES_ERROR + EvError }; }; @@ -61,7 +62,7 @@ struct TTaskRunnerActorSensorEntry { using TTaskRunnerActorSensors = TVector<TTaskRunnerActorSensorEntry>; struct TEvError - : NActors::TEventLocal<TEvError, TTaskRunnerEvents::ES_ERROR> + : NActors::TEventLocal<TEvError, TTaskRunnerEvents::EvError> { struct TStatus { int ExitCode; @@ -90,7 +91,7 @@ struct TEvError }; struct TEvPop - : NActors::TEventLocal<TEvPop, TTaskRunnerEvents::ES_POP> + : NActors::TEventLocal<TEvPop, TTaskRunnerEvents::EvPop> { TEvPop() = default; TEvPop(ui32 channelId, bool wasFinished, i64 toPop) @@ -110,22 +111,20 @@ struct TEvPop }; struct TEvPush - : NActors::TEventLocal<TEvPush, TTaskRunnerEvents::ES_PUSH> + : NActors::TEventLocal<TEvPush, TTaskRunnerEvents::EvPush> { TEvPush() = default; - TEvPush(ui32 channelId, bool finish = true, bool askFreeSpace = false, bool pauseAfterPush = false, bool isOut = false) + TEvPush(ui32 channelId, bool finish = true, bool pauseAfterPush = false, bool isOut = false) : ChannelId(channelId) , HasData(false) , Finish(finish) - , AskFreeSpace(askFreeSpace) , PauseAfterPush(pauseAfterPush) , IsOut(isOut) { } - TEvPush(ui32 channelId, NDqProto::TData&& data, bool finish = false, bool askFreeSpace = false, bool pauseAfterPush = false) + TEvPush(ui32 channelId, NDqProto::TData&& data, bool finish = false, bool pauseAfterPush = false) : ChannelId(channelId) , HasData(true) , Finish(finish) - , AskFreeSpace(askFreeSpace) , Data(std::move(data)) , PauseAfterPush(pauseAfterPush) { } @@ -133,14 +132,27 @@ struct TEvPush const ui32 ChannelId; const bool HasData = false; const bool Finish = false; - const bool AskFreeSpace = false; NDqProto::TData Data; bool PauseAfterPush = false; const bool IsOut = false; }; +struct TEvPushFinished + : NActors::TEventLocal<TEvPushFinished, TTaskRunnerEvents::EvPushFinished> { + + TEvPushFinished() = default; + + TEvPushFinished(ui32 channelId, ui64 freeSpace) + : ChannelId(channelId) + , FreeSpace(freeSpace) + { } + + ui32 ChannelId; + ui64 FreeSpace; +}; + struct TEvTaskRunnerCreate - : NActors::TEventLocal<TEvTaskRunnerCreate, TTaskRunnerEvents::ES_CREATE> + : NActors::TEventLocal<TEvTaskRunnerCreate, TTaskRunnerEvents::EvCreate> { TEvTaskRunnerCreate() = default; TEvTaskRunnerCreate( @@ -161,7 +173,7 @@ struct TEvTaskRunnerCreate }; struct TEvTaskRunnerCreateFinished - : NActors::TEventLocal<TEvTaskRunnerCreateFinished, TTaskRunnerEvents::ES_CREATE_FINISHED> + : NActors::TEventLocal<TEvTaskRunnerCreateFinished, TTaskRunnerEvents::EvCreateFinished> { TEvTaskRunnerCreateFinished() = default; @@ -188,7 +200,7 @@ struct TEvTaskRunnerCreateFinished }; struct TEvTaskRunFinished - : NActors::TEventLocal<TEvTaskRunFinished, TTaskRunnerEvents::ES_RUN_FINISHED> + : NActors::TEventLocal<TEvTaskRunFinished, TTaskRunnerEvents::EvRunFinished> { TEvTaskRunFinished() = default; TEvTaskRunFinished( @@ -228,7 +240,7 @@ struct TEvTaskRunFinished }; struct TEvChannelPopFinished - : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> + : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::EvPopFinished> { TEvChannelPopFinished() = default; @@ -295,7 +307,7 @@ struct TCheckpointRequest { }; struct TEvContinueRun - : NActors::TEventLocal<TEvContinueRun, TTaskRunnerEvents::ES_CONTINUE_RUN> { + : NActors::TEventLocal<TEvContinueRun, TTaskRunnerEvents::EvContinueRun> { TEvContinueRun() = default; @@ -304,32 +316,21 @@ struct TEvContinueRun TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly ) - : ChannelId(0) - , MemLimit(0) + : MemLimit(0) , WatermarkRequest(std::move(watermarkRequest)) , CheckpointRequest(std::move(checkpointRequest)) , CheckpointOnly(checkpointOnly) { } - TEvContinueRun(ui32 channelId, TMaybe<ui64> freeSpace) - : ChannelId(channelId) - , AskFreeSpace(false) - , MemLimit(0) - , FreeSpace(freeSpace) - { } - TEvContinueRun(THashSet<ui32>&& inputChannels, ui64 memLimit) - : ChannelId(0) - , AskFreeSpace(false) + : AskFreeSpace(false) , InputChannels(std::move(inputChannels)) , MemLimit(memLimit) { } - ui32 ChannelId; bool AskFreeSpace = true; const THashSet<ui32> InputChannels; ui64 MemLimit; - TMaybe<ui64> FreeSpace; TMaybe<TWatermarkRequest> WatermarkRequest = Nothing(); TMaybe<TCheckpointRequest> CheckpointRequest = Nothing(); bool CheckpointOnly = false; @@ -338,7 +339,7 @@ struct TEvContinueRun }; struct TEvAsyncInputPushFinished - : NActors::TEventLocal<TEvAsyncInputPushFinished, TTaskRunnerEvents::ES_ASYNC_INPUT_PUSH_FINISHED> + : NActors::TEventLocal<TEvAsyncInputPushFinished, TTaskRunnerEvents::EvAsyncInputPushFinished> { TEvAsyncInputPushFinished() = default; TEvAsyncInputPushFinished(ui64 index, i64 freeSpaceLeft) @@ -351,7 +352,7 @@ struct TEvAsyncInputPushFinished }; struct TEvSinkPop - : NActors::TEventLocal<TEvSinkPop, TTaskRunnerEvents::ES_SINK_POP> + : NActors::TEventLocal<TEvSinkPop, TTaskRunnerEvents::EvSinkPop> { TEvSinkPop() = default; TEvSinkPop(ui64 index, i64 size) @@ -364,7 +365,7 @@ struct TEvSinkPop }; struct TEvSinkPopFinished - : NActors::TEventLocal<TEvSinkPopFinished, TTaskRunnerEvents::ES_SINK_POP_FINISHED> + : NActors::TEventLocal<TEvSinkPopFinished, TTaskRunnerEvents::EvSinkPopFinished> { TEvSinkPopFinished() = default; TEvSinkPopFinished( @@ -391,19 +392,19 @@ struct TEvSinkPopFinished bool Changed; }; -struct TEvLoadTaskRunnerFromState : NActors::TEventLocal<TEvLoadTaskRunnerFromState, TTaskRunnerEvents::ES_LOAD_TASK_RUNNER_FROM_STATE>{ +struct TEvLoadTaskRunnerFromState : NActors::TEventLocal<TEvLoadTaskRunnerFromState, TTaskRunnerEvents::EvLoadTaskRunnerFromState>{ TEvLoadTaskRunnerFromState(TString&& blob) : Blob(std::move(blob)) {} TMaybe<TString> Blob; }; -struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFromStateDone, TTaskRunnerEvents::ES_LOAD_TASK_RUNNER_FROM_STATE_DONE>{ +struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFromStateDone, TTaskRunnerEvents::EvLoadTaskRunnerFromStateDone>{ TEvLoadTaskRunnerFromStateDone(TMaybe<TString> error) : Error(error) {} TMaybe<TString> Error; }; -struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::ES_STATISTICS> +struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::EvStatistics> { explicit TEvStatistics(TVector<ui32>&& sinkIds, TVector<ui32>&& inputTransformIds) : SinkIds(std::move(sinkIds)) diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index e89ba06692..763c458864 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -143,7 +143,7 @@ private: return true; } - void DoContinueRun(TEvContinueRun::TPtr& ev) { + void OnContinueRun(TEvContinueRun::TPtr& ev) { auto guard = TaskRunner->BindAllocator(MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : ev->Get()->MemLimit); auto inputMap = ev->Get()->AskFreeSpace ? Inputs @@ -253,15 +253,10 @@ private: ev->Cookie); } - void OnContinueRun(TEvContinueRun::TPtr& ev) { - DoContinueRun(ev); - } - void OnChannelPush(TEvPush::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); auto hasData = ev->Get()->HasData; auto finish = ev->Get()->Finish; - auto askFreeSpace = ev->Get()->AskFreeSpace; auto channelId = ev->Get()->ChannelId; auto data = ev->Get()->Data; if (ev->Get()->IsOut) { @@ -269,14 +264,11 @@ private: TaskRunner->GetOutputChannel(channelId)->Finish(); return; } - TMaybe<ui64> freeSpace; auto inputChannel = TaskRunner->GetInputChannel(channelId); if (hasData) { inputChannel->Push(std::move(data)); } - if (askFreeSpace) { - freeSpace = inputChannel->GetFreeSpace(); - } + const ui64 freeSpace = inputChannel->GetFreeSpace(); if (finish) { inputChannel->Finish(); } @@ -287,7 +279,7 @@ private: // run Send( ev->Sender, - new TEvContinueRun(channelId, freeSpace), + new TEvPushFinished(channelId, freeSpace), /*flags=*/0, ev->Cookie); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 2596701c45..b329f3e347 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -133,7 +133,7 @@ private: // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor HFunc(TEvDqFailure, OnError); // received from this actor itself - HFunc(TEvContinueRun, OnContinueRun); + HFunc(TEvPushFinished, OnPushFinished); cFunc(TEvents::TEvWakeup::EventType, OnWakeup); hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived); @@ -205,7 +205,7 @@ private: Send(Executer, std::move(ev)); } - void OnContinueRun(TEvContinueRun::TPtr&, const TActorContext& ctx) { + void OnPushFinished(TEvPushFinished::TPtr&, const TActorContext& ctx) { Run(ctx); } diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 0b375d1544..e77eb6b896 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -201,20 +201,17 @@ private: auto selfId = SelfId(); auto hasData = ev->Get()->HasData; auto finish = ev->Get()->Finish; - auto askFreeSpace = ev->Get()->AskFreeSpace; auto channelId = ev->Get()->ChannelId; auto cookie = ev->Cookie; auto data = ev->Get()->Data; - Invoker->Invoke([hasData, selfId, cookie, askFreeSpace, finish, channelId, taskRunner=TaskRunner, data, actorSystem, replyTo, settings=Settings, stageId=StageId] () mutable { + Invoker->Invoke([hasData, selfId, cookie, finish, channelId, taskRunner=TaskRunner, data, actorSystem, replyTo, settings=Settings, stageId=StageId] () mutable { try { // todo:(whcrc) finish output channel? ui64 freeSpace = 0; if (hasData) { // auto guard = taskRunner->BindAllocator(); // only for local mode taskRunner->GetInputChannel(channelId)->Push(std::move(data)); - if (askFreeSpace) { - freeSpace = taskRunner->GetInputChannel(channelId)->GetFreeSpace(); - } + freeSpace = taskRunner->GetInputChannel(channelId)->GetFreeSpace(); } if (finish) { taskRunner->GetInputChannel(channelId)->Finish(); @@ -225,7 +222,7 @@ private: new IEventHandle( replyTo, selfId, - new TEvContinueRun(channelId, freeSpace), + new TEvPushFinished(channelId, freeSpace), /*flags=*/0, cookie)); } catch (...) { |