aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-01-24 14:27:12 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-01-24 14:27:12 +0300
commit05589f01efdc402ba5c36ec05da309b9d786e723 (patch)
tree9ce9a74ffd50f7fff5e919d37e6c1e486abe69a4
parentf472affccc4bcf066a6b6adbaf896c74050ac6ea (diff)
downloadydb-05589f01efdc402ba5c36ec05da309b9d786e723.tar.gz
Split EvContinueRun with EvPushFinished
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h99
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp14
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp9
5 files changed, 61 insertions, 75 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 738d17a02e..aeaa4c4650 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -111,7 +111,7 @@ private:
hFunc(NTaskRunnerActor::TEvAsyncInputPushFinished, OnAsyncInputPushFinished);
hFunc(NTaskRunnerActor::TEvChannelPopFinished, OnPopFinished);
hFunc(NTaskRunnerActor::TEvTaskRunnerCreateFinished, OnTaskRunnerCreated);
- hFunc(NTaskRunnerActor::TEvContinueRun, OnContinueRun); // push finished
+ hFunc(NTaskRunnerActor::TEvPushFinished, OnPushFinished);
hFunc(TEvDqCompute::TEvStateRequest, OnStateRequest);
hFunc(NTaskRunnerActor::TEvStatistics, OnStatisticsResponse);
hFunc(NTaskRunnerActor::TEvLoadTaskRunnerFromStateDone, OnTaskRunnerLoaded);
@@ -332,12 +332,10 @@ private:
channelData.GetChannelId(),
std::move(*channelData.MutableData()),
finished,
- /* askFreeSpace = */ true,
/* pauseAfterPush = */ channelData.HasCheckpoint())
: MakeHolder<NTaskRunnerActor::TEvPush>(
channelData.GetChannelId(),
finished,
- /* askFreeSpace = */ true,
/* pauseAfterPush = */ channelData.HasCheckpoint());
Send(TaskRunnerActorId, ev.Release(), 0, Cookie);
@@ -674,7 +672,7 @@ private:
return result;
}
- void OnContinueRun(NTaskRunnerActor::TEvContinueRun::TPtr& ev) {
+ void OnPushFinished(NTaskRunnerActor::TEvPushFinished::TPtr& ev) {
auto it = TakeInputChannelDataRequests.find(ev->Cookie);
YQL_ENSURE(it != TakeInputChannelDataRequests.end());
@@ -693,9 +691,7 @@ private:
TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId);
Y_VERIFY(inputChannel);
- if (ev->Get()->FreeSpace) {
- inputChannel->FreeSpace = *ev->Get()->FreeSpace;
- }
+ inputChannel->FreeSpace = ev->Get()->FreeSpace;
if (it->second.Ack) {
Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace);
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index ab82376997..5d4ca7c182 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -23,29 +23,30 @@ namespace NTaskRunnerActor {
struct TTaskRunnerEvents {
enum {
- ES_CREATE = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 20000,
- ES_CREATE_FINISHED,
+ EvCreate = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 20000,
+ EvCreateFinished,
// channel->Pop -> TEvChannelPopFinished
- ES_POP,
- ES_POP_FINISHED,
- // channel->Push -> TEvContinueRun
- ES_PUSH,
- ES_CONTINUE_RUN,
- // ES_CONTINUE_RUN -> TaskRunner->Run() -> TEvTaskRunFinished
- ES_RUN_FINISHED,
+ EvPop,
+ EvPopFinished,
+ // channel->Push -> TEvPushFinished
+ EvPush,
+ EvPushFinished,
+ EvContinueRun,
+ // EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished
+ EvRunFinished,
- ES_ASYNC_INPUT_PUSH,
- ES_ASYNC_INPUT_PUSH_FINISHED,
+ EvAsyncInputPush,
+ EvAsyncInputPushFinished,
- ES_SINK_POP,
- ES_SINK_POP_FINISHED,
+ EvSinkPop,
+ EvSinkPopFinished,
- ES_LOAD_TASK_RUNNER_FROM_STATE,
- ES_LOAD_TASK_RUNNER_FROM_STATE_DONE,
+ EvLoadTaskRunnerFromState,
+ EvLoadTaskRunnerFromStateDone,
- ES_STATISTICS,
+ EvStatistics,
- ES_ERROR
+ EvError
};
};
@@ -61,7 +62,7 @@ struct TTaskRunnerActorSensorEntry {
using TTaskRunnerActorSensors = TVector<TTaskRunnerActorSensorEntry>;
struct TEvError
- : NActors::TEventLocal<TEvError, TTaskRunnerEvents::ES_ERROR>
+ : NActors::TEventLocal<TEvError, TTaskRunnerEvents::EvError>
{
struct TStatus {
int ExitCode;
@@ -90,7 +91,7 @@ struct TEvError
};
struct TEvPop
- : NActors::TEventLocal<TEvPop, TTaskRunnerEvents::ES_POP>
+ : NActors::TEventLocal<TEvPop, TTaskRunnerEvents::EvPop>
{
TEvPop() = default;
TEvPop(ui32 channelId, bool wasFinished, i64 toPop)
@@ -110,22 +111,20 @@ struct TEvPop
};
struct TEvPush
- : NActors::TEventLocal<TEvPush, TTaskRunnerEvents::ES_PUSH>
+ : NActors::TEventLocal<TEvPush, TTaskRunnerEvents::EvPush>
{
TEvPush() = default;
- TEvPush(ui32 channelId, bool finish = true, bool askFreeSpace = false, bool pauseAfterPush = false, bool isOut = false)
+ TEvPush(ui32 channelId, bool finish = true, bool pauseAfterPush = false, bool isOut = false)
: ChannelId(channelId)
, HasData(false)
, Finish(finish)
- , AskFreeSpace(askFreeSpace)
, PauseAfterPush(pauseAfterPush)
, IsOut(isOut)
{ }
- TEvPush(ui32 channelId, NDqProto::TData&& data, bool finish = false, bool askFreeSpace = false, bool pauseAfterPush = false)
+ TEvPush(ui32 channelId, NDqProto::TData&& data, bool finish = false, bool pauseAfterPush = false)
: ChannelId(channelId)
, HasData(true)
, Finish(finish)
- , AskFreeSpace(askFreeSpace)
, Data(std::move(data))
, PauseAfterPush(pauseAfterPush)
{ }
@@ -133,14 +132,27 @@ struct TEvPush
const ui32 ChannelId;
const bool HasData = false;
const bool Finish = false;
- const bool AskFreeSpace = false;
NDqProto::TData Data;
bool PauseAfterPush = false;
const bool IsOut = false;
};
+struct TEvPushFinished
+ : NActors::TEventLocal<TEvPushFinished, TTaskRunnerEvents::EvPushFinished> {
+
+ TEvPushFinished() = default;
+
+ TEvPushFinished(ui32 channelId, ui64 freeSpace)
+ : ChannelId(channelId)
+ , FreeSpace(freeSpace)
+ { }
+
+ ui32 ChannelId;
+ ui64 FreeSpace;
+};
+
struct TEvTaskRunnerCreate
- : NActors::TEventLocal<TEvTaskRunnerCreate, TTaskRunnerEvents::ES_CREATE>
+ : NActors::TEventLocal<TEvTaskRunnerCreate, TTaskRunnerEvents::EvCreate>
{
TEvTaskRunnerCreate() = default;
TEvTaskRunnerCreate(
@@ -161,7 +173,7 @@ struct TEvTaskRunnerCreate
};
struct TEvTaskRunnerCreateFinished
- : NActors::TEventLocal<TEvTaskRunnerCreateFinished, TTaskRunnerEvents::ES_CREATE_FINISHED>
+ : NActors::TEventLocal<TEvTaskRunnerCreateFinished, TTaskRunnerEvents::EvCreateFinished>
{
TEvTaskRunnerCreateFinished() = default;
@@ -188,7 +200,7 @@ struct TEvTaskRunnerCreateFinished
};
struct TEvTaskRunFinished
- : NActors::TEventLocal<TEvTaskRunFinished, TTaskRunnerEvents::ES_RUN_FINISHED>
+ : NActors::TEventLocal<TEvTaskRunFinished, TTaskRunnerEvents::EvRunFinished>
{
TEvTaskRunFinished() = default;
TEvTaskRunFinished(
@@ -228,7 +240,7 @@ struct TEvTaskRunFinished
};
struct TEvChannelPopFinished
- : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED>
+ : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::EvPopFinished>
{
TEvChannelPopFinished() = default;
@@ -295,7 +307,7 @@ struct TCheckpointRequest {
};
struct TEvContinueRun
- : NActors::TEventLocal<TEvContinueRun, TTaskRunnerEvents::ES_CONTINUE_RUN> {
+ : NActors::TEventLocal<TEvContinueRun, TTaskRunnerEvents::EvContinueRun> {
TEvContinueRun() = default;
@@ -304,32 +316,21 @@ struct TEvContinueRun
TMaybe<TCheckpointRequest>&& checkpointRequest,
bool checkpointOnly
)
- : ChannelId(0)
- , MemLimit(0)
+ : MemLimit(0)
, WatermarkRequest(std::move(watermarkRequest))
, CheckpointRequest(std::move(checkpointRequest))
, CheckpointOnly(checkpointOnly)
{ }
- TEvContinueRun(ui32 channelId, TMaybe<ui64> freeSpace)
- : ChannelId(channelId)
- , AskFreeSpace(false)
- , MemLimit(0)
- , FreeSpace(freeSpace)
- { }
-
TEvContinueRun(THashSet<ui32>&& inputChannels, ui64 memLimit)
- : ChannelId(0)
- , AskFreeSpace(false)
+ : AskFreeSpace(false)
, InputChannels(std::move(inputChannels))
, MemLimit(memLimit)
{ }
- ui32 ChannelId;
bool AskFreeSpace = true;
const THashSet<ui32> InputChannels;
ui64 MemLimit;
- TMaybe<ui64> FreeSpace;
TMaybe<TWatermarkRequest> WatermarkRequest = Nothing();
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
@@ -338,7 +339,7 @@ struct TEvContinueRun
};
struct TEvAsyncInputPushFinished
- : NActors::TEventLocal<TEvAsyncInputPushFinished, TTaskRunnerEvents::ES_ASYNC_INPUT_PUSH_FINISHED>
+ : NActors::TEventLocal<TEvAsyncInputPushFinished, TTaskRunnerEvents::EvAsyncInputPushFinished>
{
TEvAsyncInputPushFinished() = default;
TEvAsyncInputPushFinished(ui64 index, i64 freeSpaceLeft)
@@ -351,7 +352,7 @@ struct TEvAsyncInputPushFinished
};
struct TEvSinkPop
- : NActors::TEventLocal<TEvSinkPop, TTaskRunnerEvents::ES_SINK_POP>
+ : NActors::TEventLocal<TEvSinkPop, TTaskRunnerEvents::EvSinkPop>
{
TEvSinkPop() = default;
TEvSinkPop(ui64 index, i64 size)
@@ -364,7 +365,7 @@ struct TEvSinkPop
};
struct TEvSinkPopFinished
- : NActors::TEventLocal<TEvSinkPopFinished, TTaskRunnerEvents::ES_SINK_POP_FINISHED>
+ : NActors::TEventLocal<TEvSinkPopFinished, TTaskRunnerEvents::EvSinkPopFinished>
{
TEvSinkPopFinished() = default;
TEvSinkPopFinished(
@@ -391,19 +392,19 @@ struct TEvSinkPopFinished
bool Changed;
};
-struct TEvLoadTaskRunnerFromState : NActors::TEventLocal<TEvLoadTaskRunnerFromState, TTaskRunnerEvents::ES_LOAD_TASK_RUNNER_FROM_STATE>{
+struct TEvLoadTaskRunnerFromState : NActors::TEventLocal<TEvLoadTaskRunnerFromState, TTaskRunnerEvents::EvLoadTaskRunnerFromState>{
TEvLoadTaskRunnerFromState(TString&& blob) : Blob(std::move(blob)) {}
TMaybe<TString> Blob;
};
-struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFromStateDone, TTaskRunnerEvents::ES_LOAD_TASK_RUNNER_FROM_STATE_DONE>{
+struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFromStateDone, TTaskRunnerEvents::EvLoadTaskRunnerFromStateDone>{
TEvLoadTaskRunnerFromStateDone(TMaybe<TString> error) : Error(error) {}
TMaybe<TString> Error;
};
-struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::ES_STATISTICS>
+struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::EvStatistics>
{
explicit TEvStatistics(TVector<ui32>&& sinkIds, TVector<ui32>&& inputTransformIds)
: SinkIds(std::move(sinkIds))
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index e89ba06692..763c458864 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -143,7 +143,7 @@ private:
return true;
}
- void DoContinueRun(TEvContinueRun::TPtr& ev) {
+ void OnContinueRun(TEvContinueRun::TPtr& ev) {
auto guard = TaskRunner->BindAllocator(MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : ev->Get()->MemLimit);
auto inputMap = ev->Get()->AskFreeSpace
? Inputs
@@ -253,15 +253,10 @@ private:
ev->Cookie);
}
- void OnContinueRun(TEvContinueRun::TPtr& ev) {
- DoContinueRun(ev);
- }
-
void OnChannelPush(TEvPush::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
auto hasData = ev->Get()->HasData;
auto finish = ev->Get()->Finish;
- auto askFreeSpace = ev->Get()->AskFreeSpace;
auto channelId = ev->Get()->ChannelId;
auto data = ev->Get()->Data;
if (ev->Get()->IsOut) {
@@ -269,14 +264,11 @@ private:
TaskRunner->GetOutputChannel(channelId)->Finish();
return;
}
- TMaybe<ui64> freeSpace;
auto inputChannel = TaskRunner->GetInputChannel(channelId);
if (hasData) {
inputChannel->Push(std::move(data));
}
- if (askFreeSpace) {
- freeSpace = inputChannel->GetFreeSpace();
- }
+ const ui64 freeSpace = inputChannel->GetFreeSpace();
if (finish) {
inputChannel->Finish();
}
@@ -287,7 +279,7 @@ private:
// run
Send(
ev->Sender,
- new TEvContinueRun(channelId, freeSpace),
+ new TEvPushFinished(channelId, freeSpace),
/*flags=*/0,
ev->Cookie);
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 2596701c45..b329f3e347 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -133,7 +133,7 @@ private:
// between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field
HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor
HFunc(TEvDqFailure, OnError); // received from this actor itself
- HFunc(TEvContinueRun, OnContinueRun);
+ HFunc(TEvPushFinished, OnPushFinished);
cFunc(TEvents::TEvWakeup::EventType, OnWakeup);
hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
@@ -205,7 +205,7 @@ private:
Send(Executer, std::move(ev));
}
- void OnContinueRun(TEvContinueRun::TPtr&, const TActorContext& ctx) {
+ void OnPushFinished(TEvPushFinished::TPtr&, const TActorContext& ctx) {
Run(ctx);
}
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 0b375d1544..e77eb6b896 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -201,20 +201,17 @@ private:
auto selfId = SelfId();
auto hasData = ev->Get()->HasData;
auto finish = ev->Get()->Finish;
- auto askFreeSpace = ev->Get()->AskFreeSpace;
auto channelId = ev->Get()->ChannelId;
auto cookie = ev->Cookie;
auto data = ev->Get()->Data;
- Invoker->Invoke([hasData, selfId, cookie, askFreeSpace, finish, channelId, taskRunner=TaskRunner, data, actorSystem, replyTo, settings=Settings, stageId=StageId] () mutable {
+ Invoker->Invoke([hasData, selfId, cookie, finish, channelId, taskRunner=TaskRunner, data, actorSystem, replyTo, settings=Settings, stageId=StageId] () mutable {
try {
// todo:(whcrc) finish output channel?
ui64 freeSpace = 0;
if (hasData) {
// auto guard = taskRunner->BindAllocator(); // only for local mode
taskRunner->GetInputChannel(channelId)->Push(std::move(data));
- if (askFreeSpace) {
- freeSpace = taskRunner->GetInputChannel(channelId)->GetFreeSpace();
- }
+ freeSpace = taskRunner->GetInputChannel(channelId)->GetFreeSpace();
}
if (finish) {
taskRunner->GetInputChannel(channelId)->Finish();
@@ -225,7 +222,7 @@ private:
new IEventHandle(
replyTo,
selfId,
- new TEvContinueRun(channelId, freeSpace),
+ new TEvPushFinished(channelId, freeSpace),
/*flags=*/0,
cookie));
} catch (...) {