aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-07-21 18:29:41 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-07-21 18:29:41 +0300
commitb839916a3502d942ce4844416a56f1e0c8856119 (patch)
tree75ff5c9c148c92e3b2691ab97d8dd8c628558fe9
parentb48cd8291c9b1dce6da647f01b5461347f3d7efb (diff)
downloadydb-b839916a3502d942ce4844416a56f1e0c8856119.tar.gz
Fix infinite loop in async CA
Fix size Little things Little things
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h3
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp184
3 files changed, 86 insertions, 109 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 3456cf0bc9..9666cc9b20 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -157,7 +157,7 @@ private:
);
outputChannel.PopStarted = true;
- ProcessOutputsState.Inflight ++;
+ ProcessOutputsState.Inflight++;
if (toSend <= 0) {
if (Y_UNLIKELY(outputChannel.Stats)) {
outputChannel.Stats->BlockedByCapacity++;
@@ -196,7 +196,7 @@ private:
);
sinkInfo.PopStarted = true;
- ProcessOutputsState.Inflight ++;
+ ProcessOutputsState.Inflight++;
sinkInfo.FreeSpaceBeforeSend = sinkFreeSpaceBeforeSend;
Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkFreeSpaceBeforeSend));
}
@@ -535,7 +535,7 @@ private:
}
sinkInfo.PopStarted = false;
- ProcessOutputsState.Inflight --;
+ ProcessOutputsState.Inflight--;
ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
{
@@ -545,7 +545,7 @@ private:
}
Y_VERIFY(batch.empty());
- CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
+ CA_LOG_D("Sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
CA_LOG_D("Drain sink " << outputIndex
<< ". Free space decreased: " << (sinkInfo.FreeSpaceBeforeSend - sinkInfo.AsyncOutput->GetFreeSpace())
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 513c002e2f..7eafdbb4f6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1469,8 +1469,7 @@ protected:
void PollAsyncInput(TAsyncInputInfoBase& info, ui64 inputIndex) {
Y_VERIFY(!TaskRunner || info.Buffer);
if (info.Finished) {
- const ui64 indexForLogging = inputIndex; // Crutch for clang
- CA_LOG_D("Skip polling async input[" << indexForLogging << "]: finished");
+ CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished");
return;
}
const i64 freeSpace = AsyncIoFreeSpace(info);
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 1e944bcfc2..84a97e14d8 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -50,57 +50,52 @@ public:
{ }
STFUNC(Handler) {
+ Y_UNUSED(ctx);
try {
switch (ev->GetTypeRewrite()) {
cFunc(NActors::TEvents::TEvPoison::EventType, TLocalTaskRunnerActor::PassAway);
- HFunc(TEvTaskRunnerCreate, OnDqTask);
- HFunc(TEvContinueRun, OnContinueRun);
- HFunc(TEvPop, OnChannelPop);
- HFunc(TEvPush, OnChannelPush);
- HFunc(TEvSinkPop, OnSinkPop);
- HFunc(TEvLoadTaskRunnerFromState, OnLoadTaskRunnerFromState);
- HFunc(TEvStatistics, OnStatisticsRequest);
+ hFunc(TEvTaskRunnerCreate, OnDqTask);
+ hFunc(TEvContinueRun, OnContinueRun);
+ hFunc(TEvPop, OnChannelPop);
+ hFunc(TEvPush, OnChannelPush);
+ hFunc(TEvSinkPop, OnSinkPop);
+ hFunc(TEvLoadTaskRunnerFromState, OnLoadTaskRunnerFromState);
+ hFunc(TEvStatistics, OnStatisticsRequest);
default: {
Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, ev->GetTypeRewrite());
}
}
} catch (const NKikimr::TMemoryLimitExceededException& e) {
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- GetError(e).Release(),
- 0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ GetError(e).Release(),
+ 0,
+ ev->Cookie);
} catch (...) {
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- GetError(CurrentExceptionMessage()).Release(),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ GetError(CurrentExceptionMessage()).Release(),
+ /*flags=*/0,
+ ev->Cookie);
}
}
private:
- void OnStatisticsRequest(TEvStatistics::TPtr& ev, const TActorContext& ctx) {
+ void OnStatisticsRequest(TEvStatistics::TPtr& ev) {
TaskRunner->UpdateStats();
THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats;
for (const auto sinkId : ev->Get()->SinkIds) {
sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats();
}
ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats));
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- ev->Release().Release(),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ ev->Release().Release(),
+ /*flags=*/0,
+ ev->Cookie);
}
- void OnLoadTaskRunnerFromState(TEvLoadTaskRunnerFromState::TPtr& ev, const TActorContext& ctx) {
+ void OnLoadTaskRunnerFromState(TEvLoadTaskRunnerFromState::TPtr& ev) {
TMaybe<TString> error = Nothing();
try {
auto guard = TaskRunner->BindAllocator();
@@ -108,13 +103,11 @@ private:
} catch (const std::exception& e) {
error = e.what();
}
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- new TEvLoadTaskRunnerFromStateDone(std::move(error)),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ new TEvLoadTaskRunnerFromStateDone(std::move(error)),
+ /*flags=*/0,
+ ev->Cookie);
}
void PassAway() override {
@@ -141,14 +134,12 @@ private:
return true;
}
- void DoContinueRun(TEvContinueRun::TPtr& ev, const TActorContext& ctx) {
+ void DoContinueRun(TEvContinueRun::TPtr& ev) {
auto guard = TaskRunner->BindAllocator(MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : ev->Get()->MemLimit);
auto inputMap = ev->Get()->AskFreeSpace
? Inputs
: ev->Get()->InputChannels;
- auto& sourcesMap = Sources;
-
NYql::NDq::ERunStatus res = ERunStatus::Finished;
THashMap<ui32, ui64> inputChannelFreeSpace;
THashMap<ui32, ui64> sourcesFreeSpace;
@@ -161,7 +152,7 @@ private:
inputChannelFreeSpace[channelId] = TaskRunner->GetInputChannel(channelId)->GetFreeSpace();
}
- for (auto& index : sourcesMap) {
+ for (auto& index : Sources) {
sourcesFreeSpace[index] = TaskRunner->GetSource(index)->GetFreeSpace();
}
}
@@ -170,7 +161,6 @@ private:
if ((res == ERunStatus::PendingInput || res == ERunStatus::Finished) && ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) {
mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>();
try {
- auto guard = TaskRunner->BindAllocator();
mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData();
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
@@ -193,28 +183,26 @@ private:
MemoryQuota->TryShrinkMemory(guard.GetMutex());
}
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- new TEvTaskRunFinished(
- res,
- std::move(inputChannelFreeSpace),
- std::move(sourcesFreeSpace),
- {},
- MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(),
- MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0,
- std::move(mkqlProgramState),
- ev->Get()->CheckpointRequest.Defined()),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ new TEvTaskRunFinished(
+ res,
+ std::move(inputChannelFreeSpace),
+ std::move(sourcesFreeSpace),
+ {},
+ MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(),
+ MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0,
+ std::move(mkqlProgramState),
+ ev->Get()->CheckpointRequest.Defined()),
+ /*flags=*/0,
+ ev->Cookie);
}
- void OnContinueRun(TEvContinueRun::TPtr& ev, const TActorContext& ctx) {
- DoContinueRun(ev, ctx);
+ void OnContinueRun(TEvContinueRun::TPtr& ev) {
+ DoContinueRun(ev);
}
- void OnChannelPush(TEvPush::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnChannelPush(TEvPush::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
auto hasData = ev->Get()->HasData;
auto finish = ev->Get()->Finish;
@@ -241,13 +229,11 @@ private:
}
// run
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- new TEvContinueRun(channelId, freeSpace),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ new TEvContinueRun(channelId, freeSpace),
+ /*flags=*/0,
+ ev->Cookie);
}
void AsyncInputPush(
@@ -257,22 +243,19 @@ private:
i64 space,
bool finish) override
{
- auto& ctx = NActors::TlsActivationContext;
auto source = TaskRunner->GetSource(index);
source->Push(std::move(batch), space);
if (finish) {
source->Finish();
}
- ctx->Send(
- new IEventHandle(
- ParentId,
- SelfId(),
- new TEvAsyncInputPushFinished(index),
- /*flags=*/0,
- cookie));
+ Send(
+ ParentId,
+ new TEvAsyncInputPushFinished(index),
+ /*flags=*/0,
+ cookie);
}
- void OnChannelPop(TEvPop::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnChannelPop(TEvPop::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
auto channelId = ev->Get()->ChannelId;
@@ -313,20 +296,18 @@ private:
}
}
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- new TEvChannelPopFinished(
- channelId,
- std::move(chunks),
- std::move(checkpoint),
- isFinished,
- changed,
- {},
- TDqTaskRunnerStatsView(TaskRunner->GetStats())),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ new TEvChannelPopFinished(
+ channelId,
+ std::move(chunks),
+ std::move(checkpoint),
+ isFinished,
+ changed,
+ {},
+ TDqTaskRunnerStatsView(TaskRunner->GetStats())),
+ /*flags=*/0,
+ ev->Cookie);
}
void ResumeInputs() {
@@ -335,8 +316,7 @@ private:
}
}
- void OnSinkPop(TEvSinkPop::TPtr& ev, const NActors::TActorContext& ctx) {
- Y_UNUSED(ctx);
+ void OnSinkPop(TEvSinkPop::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
auto sink = TaskRunner->GetSink(ev->Get()->Index);
@@ -349,19 +329,19 @@ private:
if (ev->Get()->Size > 0) {
size = sink->Pop(batch, ev->Get()->Size);
}
- bool hasCheckpoint = sink->Pop(checkpoint);
+ const bool hasCheckpoint = sink->Pop(checkpoint);
if (hasCheckpoint) {
checkpointSize = checkpoint.ByteSize();
maybeCheckpoint.ConstructInPlace(std::move(checkpoint));
ResumeInputs();
}
- auto finished = sink->IsFinished();
- bool changed = finished || ev->Get()->Size > 0 || hasCheckpoint;
+ const bool finished = sink->IsFinished();
+ const bool changed = finished || size > 0 || hasCheckpoint;
Parent->SinkSend(ev->Get()->Index, std::move(batch), std::move(maybeCheckpoint), checkpointSize, size, finished, changed);
}
- void OnDqTask(TEvTaskRunnerCreate::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
TaskRunner = Factory(ev->Get()->Task, [this](const TString& message) {
LOG_D(message);
@@ -392,13 +372,11 @@ private:
TaskRunner->GetTypeEnv(),
TaskRunner->GetHolderFactory());
- ctx.Send(
- new IEventHandle(
- ev->Sender,
- SelfId(),
- event.Release(),
- /*flags=*/0,
- ev->Cookie));
+ Send(
+ ev->Sender,
+ event.Release(),
+ /*flags=*/0,
+ ev->Cookie);
}
THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) {