diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-18 07:48:47 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-18 07:48:47 +0300 |
commit | 52272d2c11e45e04d6875601fd38c7adb8f23af2 (patch) | |
tree | d9123cfb8433788570f3a2835405e87862969d8f | |
parent | 8dfcb1a9f58b63dbc582bc728a5e979afc19d789 (diff) | |
download | ydb-52272d2c11e45e04d6875601fd38c7adb8f23af2.tar.gz |
KIKIMR-18556:use peer state for correct memory volume calculation in case actor system queue in interconnect
+ TrySendAsyncChannelData check memory usage
4 files changed, 97 insertions, 72 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 53494e31c36..34c2e143bde 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -194,7 +194,7 @@ private: return TaskRunnerStats.GetInputTransformStats(inputIdx); } - void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override { + void DrainOutputChannel(TOutputChannelInfo& outputChannel) override { YQL_ENSURE(!outputChannel.Finished || Checkpoints); if (outputChannel.PopStarted) { @@ -204,37 +204,29 @@ private: const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.ChannelId; - const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - - const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; + const auto& peerState = Channels->GetOutputChannelInFlightState(channelId); CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer - << ", peerFreeSpace: " << peerState.PeerFreeSpace - << ", inFlightBytes: " << peerState.InFlightBytes - << ", inFlightRows: " << peerState.InFlightRows - << ", inFlightCount: " << peerState.InFlightCount - << ", allowedOvercommit: " << allowedOvercommit - << ", toSend: " << toSend + << ", peerState:(" << peerState.DebugString() << ")" // << ", finished: " << outputChannel.Channel->IsFinished()); ); outputChannel.PopStarted = true; - ProcessOutputsState.Inflight++; - UpdateBlocked(outputChannel, toSend); - if (toSend <= 0) { + const bool hasFreeMemory = peerState.HasFreeMemory(); + UpdateBlocked(outputChannel, !hasFreeMemory); + if (!hasFreeMemory) { CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId - << ". To send: " << toSend - << ". Free space: " << peerState.PeerFreeSpace - << ". Inflight: " << peerState.InFlightBytes - << ". Allowed overcommit: " << allowedOvercommit); + << ", peerState:(" << peerState.DebugString() << ")" + ); auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId); Y_VERIFY(!ev->Finished); Send(SelfId(), std::move(ev)); // try again, ev.Finished == false return; } + ProcessOutputsState.Inflight++; - Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend)); + Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, peerState.GetFreeMemory())); } void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override { @@ -584,6 +576,9 @@ private: if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution. return false; } + if (!Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) { + return false; + } auto& asyncData = *outputChannel.AsyncData; outputChannel.Finished = asyncData.Finished || shouldSkipData; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index 48e591d875e..dd0d1321cd2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -65,7 +65,7 @@ TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& tx for (auto& channel : task.GetOutputs(i).GetChannels()) { TOutputChannelState outputChannel; outputChannel.ChannelId = channel.GetId(); - outputChannel.PeerState.PeerFreeSpace = channelBufferSize; + outputChannel.PeerState.ActualizeFreeSpace(channelBufferSize); if (channel.GetDstEndpoint().HasActorId()) { outputChannel.Peer = ActorIdFromProto(channel.GetDstEndpoint().GetActorId()); @@ -205,22 +205,13 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr& // remove all messages with seqNo <= ackSeqNo auto it = outputChannel.InFlight.begin(); while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) { - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightBytes >= it->second.Data.PayloadSize()); - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightRows >= it->second.Data.RowCount()); - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightCount >= 1); - - outputChannel.PeerState.InFlightBytes -= it->second.Data.PayloadSize(); - outputChannel.PeerState.InFlightRows -= it->second.Data.RowCount(); - outputChannel.PeerState.InFlightCount -= 1; + outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount()); it = outputChannel.InFlight.erase(it); } - outputChannel.PeerState.PrevPeerFreeSpace = outputChannel.PeerState.PeerFreeSpace; - outputChannel.PeerState.PeerFreeSpace = record.GetFreeSpace(); + outputChannel.PeerState.ActualizeFreeSpace(record.GetFreeSpace()); - LOG_T("PeerState, freeSpace: " << outputChannel.PeerState.PeerFreeSpace - << ", inflight bytes: " << outputChannel.PeerState.InFlightBytes - << ", inflight count: " << outputChannel.PeerState.InFlightCount + LOG_T("PeerState, peerState:(" << outputChannel.PeerState.DebugString() << ")" << ", sentSeqNo: " << outputChannel.LastSentSeqNo << ", ackSeqNo: " << record.GetSeqNo()); @@ -560,8 +551,13 @@ void TDqComputeActorChannels::SetOutputChannelPeer(ui64 channelId, const TActorI outputChannel.Peer = peer; } -bool TDqComputeActorChannels::CanSendChannelData(ui64 channelId) { - TOutputChannelState& outputChannel = OutCh(channelId); +bool TDqComputeActorChannels::HasFreeMemoryInChannel(const ui64 channelId) const { + const TOutputChannelState& outputChannel = OutCh(channelId); + return outputChannel.PeerState.HasFreeMemory(); +} + +bool TDqComputeActorChannels::CanSendChannelData(const ui64 channelId) const { + const TOutputChannelState& outputChannel = OutCh(channelId); return outputChannel.Peer && (!outputChannel.Finished || SupportCheckpoints) && !outputChannel.RetryState; } @@ -576,10 +572,10 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) { YQL_ENSURE(!outputChannel.Finished || SupportCheckpoints); YQL_ENSURE(!outputChannel.RetryState); - ui64 seqNo = ++outputChannel.LastSentSeqNo; - ui32 chunkBytes = channelData.Proto.GetData().GetRaw().size() + channelData.Payload.size(); - ui32 chunkRows = channelData.Proto.GetData().GetRows(); - bool finished = channelData.Proto.GetFinished(); + const ui64 seqNo = ++outputChannel.LastSentSeqNo; + const ui32 chunkBytes = channelData.PayloadSize(); + const ui32 chunkRows = channelData.RowCount(); + const bool finished = channelData.Proto.GetFinished(); LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId() << ", peer: " << *outputChannel.Peer @@ -623,9 +619,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) { ui32 flags = CalcMessageFlags(*outputChannel.Peer); Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId); - outputChannel.PeerState.InFlightBytes += chunkBytes; - outputChannel.PeerState.InFlightRows += chunkRows; - outputChannel.PeerState.InFlightCount += 1; + outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows); } bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) { @@ -801,7 +795,13 @@ TDqComputeActorChannels::TInputChannelState& TDqComputeActorChannels::InCh(ui64 return *ch; } -TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(ui64 channelId) { +const TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) const { + auto ch = OutputChannelsMap.FindPtr(channelId); + YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); + return *ch; +} + +TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) { auto ch = OutputChannelsMap.FindPtr(channelId); YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); return *ch; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h index cade7d4fb70..8cedb791a4e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h @@ -7,17 +7,59 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/log.h> +#include <util/generic/noncopyable.h> namespace NYql::NDq { class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels> { public: - struct TPeerState { + struct TPeerState: NNonCopyable::TMoveOnly { + private: + static const ui32 InterconnectHeadersSize = 96; + i64 InFlightBytes = 0; + i64 InFlightRows = 0; + i32 InFlightCount = 0; i64 PeerFreeSpace = 0; - ui64 InFlightBytes = 0; - ui64 InFlightRows = 0; - ui32 InFlightCount = 0; - i64 PrevPeerFreeSpace = 0; + public: + i64 GetFreeMemory() const { + return PeerFreeSpace - InFlightBytes; + } + + bool NeedAck() const { + return (InFlightCount % 16 == 0) || !HasFreeMemory(); + } + + TString DebugString() const { + return TStringBuilder() << + "freeSpace:" << PeerFreeSpace << ";" << + "inFlightBytes:" << InFlightBytes << ";" << + "inFlightCount:" << InFlightCount << ";" + ; + } + + bool HasFreeMemory() const { + return PeerFreeSpace >= InFlightBytes; + } + + void ActualizeFreeSpace(const i64 actual) { + PeerFreeSpace = actual; + } + + void AddInFlight(const ui64 bytes, const ui64 rows) { + InFlightBytes += bytes + InterconnectHeadersSize; + InFlightRows += rows; + InFlightCount += 1; + } + + void RemoveInFlight(const ui64 bytes, const ui64 rows) { + InFlightBytes -= (bytes + InterconnectHeadersSize); + Y_VERIFY(InFlightBytes >= 0); + InFlightRows -= rows; + Y_VERIFY(InFlightRows >= 0); + InFlightCount -= 1; + Y_VERIFY(InFlightCount >= 0); + } }; struct ICallbacks { @@ -75,7 +117,8 @@ public: void SetCheckpointsSupport(); // Finished channels will be polled for checkpoints. void SetInputChannelPeer(ui64 channelId, const NActors::TActorId& peer); void SetOutputChannelPeer(ui64 channelId, const NActors::TActorId& peer); - bool CanSendChannelData(ui64 channelId); + bool CanSendChannelData(const ui64 channelId) const; + bool HasFreeMemoryInChannel(const ui64 channelId) const; void SendChannelData(TChannelDataOOB&& channelData); void SendChannelDataAck(i64 channelId, i64 freeSpace); bool PollChannel(ui64 channelId, i64 freeSpace); @@ -179,6 +222,7 @@ private: ui32 CalcMessageFlags(const NActors::TActorId& peer); TInputChannelState& InCh(ui64 channelId); TOutputChannelState& OutCh(ui64 channelId); + const TOutputChannelState& OutCh(const ui64 channelId) const; private: const NActors::TActorId Owner; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index ef64de90857..6e375bda158 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -350,7 +350,7 @@ protected: // Send checkpoints to output channels. ProcessOutputsImpl(ERunStatus::Finished); - return true; // returns true, when channels were handled syncronously + return true; // returns true, when channels were handled synchronously } void ProcessOutputsImpl(ERunStatus status) { @@ -379,8 +379,7 @@ protected: if (!outputChannel.Finished || Checkpoints) { if (Channels->CanSendChannelData(channelId)) { - auto peerState = Channels->GetOutputChannelInFlightState(channelId); - DrainOutputChannel(outputChannel, peerState); + DrainOutputChannel(outputChannel); } else { ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; } @@ -1265,9 +1264,9 @@ protected: protected: - void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) { + void UpdateBlocked(TOutputChannelInfo& outputChannel, const bool blocked) { if (Y_UNLIKELY(outputChannel.Stats)) { - if (toSend <= 0) { + if (blocked) { outputChannel.Stats->BlockedByCapacity++; if (!outputChannel.Stats->StartBlockedTime) { outputChannel.Stats->StartBlockedTime = TInstant::Now(); @@ -1287,43 +1286,30 @@ private: return MemoryQuota->GetProfileStats(); } - virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) { + virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel) { YQL_ENSURE(!outputChannel.Finished || Checkpoints); const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.Channel->GetChannelId(); - const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - - const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; - CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer - << ", peerFreeSpace: " << peerState.PeerFreeSpace - << ", inFlightBytes: " << peerState.InFlightBytes - << ", inFlightRows: " << peerState.InFlightRows - << ", inFlightCount: " << peerState.InFlightCount - << ", allowedOvercommit: " << allowedOvercommit - << ", toSend: " << toSend << ", finished: " << outputChannel.Channel->IsFinished()); ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - UpdateBlocked(outputChannel, toSend); + UpdateBlocked(outputChannel, !Channels->HasFreeMemoryInChannel(channelId)); - i64 remains = toSend; - while (remains > 0 && (!outputChannel.Finished || Checkpoints)) { - ui32 sent = this->SendChannelDataChunk(outputChannel); - if (sent == 0) { - break; - } - remains -= sent; + ui32 sentChunks = 0; + while ((!outputChannel.Finished || Checkpoints) && + Channels->HasFreeMemoryInChannel(outputChannel.ChannelId) && SendChannelDataChunk(outputChannel)) { + ++sentChunks; } ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend; + ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks; } ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) { |