diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-24 19:19:16 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-24 19:19:16 +0300 |
commit | 86488464a705e3512cdba41ed37b203d84ba17b6 (patch) | |
tree | bd0e15c77c75008fedf5fc7a33e68d0b5ed4a949 | |
parent | 25ffc448423b8f6ab955a036ec8f15a206a4ba93 (diff) | |
download | ydb-86488464a705e3512cdba41ed37b203d84ba17b6.tar.gz |
KIKIMR-18556,KIKIMR-18783: reuse 11963924,11955437
4 files changed, 205 insertions, 126 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 53494e31c36..938f5f3ba32 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -194,7 +194,7 @@ private: return TaskRunnerStats.GetInputTransformStats(inputIdx); } - void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override { + void DrainOutputChannel(TOutputChannelInfo& outputChannel) override { YQL_ENSURE(!outputChannel.Finished || Checkpoints); if (outputChannel.PopStarted) { @@ -204,37 +204,29 @@ private: const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.ChannelId; - const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - - const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; + const auto& peerState = Channels->GetOutputChannelInFlightState(channelId); CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer - << ", peerFreeSpace: " << peerState.PeerFreeSpace - << ", inFlightBytes: " << peerState.InFlightBytes - << ", inFlightRows: " << peerState.InFlightRows - << ", inFlightCount: " << peerState.InFlightCount - << ", allowedOvercommit: " << allowedOvercommit - << ", toSend: " << toSend + << ", peerState:(" << peerState.DebugString() << ")" // << ", finished: " << outputChannel.Channel->IsFinished()); ); outputChannel.PopStarted = true; + const bool hasFreeMemory = peerState.HasFreeMemory(); + UpdateBlocked(outputChannel, !hasFreeMemory); ProcessOutputsState.Inflight++; - UpdateBlocked(outputChannel, toSend); - if (toSend <= 0) { + if (!hasFreeMemory) { CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId - << ". To send: " << toSend - << ". Free space: " << peerState.PeerFreeSpace - << ". Inflight: " << peerState.InFlightBytes - << ". Allowed overcommit: " << allowedOvercommit); + << ", peerState:(" << peerState.DebugString() << ")" + ); auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId); Y_VERIFY(!ev->Finished); Send(SelfId(), std::move(ev)); // try again, ev.Finished == false return; } - Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend)); + Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, peerState.GetFreeMemory())); } void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override { @@ -547,8 +539,7 @@ private: auto& source = it->second; source.PushStarted = false; source.FreeSpace = ev->Get()->FreeSpaceLeft; - ProcessSourcesState.Inflight--; - if (ProcessSourcesState.Inflight == 0) { + if (--ProcessSourcesState.Inflight == 0) { CA_LOG_T("Send TEvContinueRun on OnAsyncInputPushFinished"); AskContinueRun(Nothing(), false); } @@ -584,6 +575,9 @@ private: if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution. return false; } + if (!shouldSkipData && !Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) { + return false; + } auto& asyncData = *outputChannel.AsyncData; outputChannel.Finished = asyncData.Finished || shouldSkipData; @@ -628,7 +622,7 @@ private: if (lastChunk && asyncData.Checkpoint.Defined()) { channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } - Channels->SendChannelData(std::move(channelData)); + Channels->SendChannelData(std::move(channelData), i + 1 == asyncData.Data.size()); } if (asyncData.Data.empty() && asyncData.Changed) { TChannelDataOOB channelData; @@ -640,7 +634,7 @@ private: if (asyncData.Checkpoint.Defined()) { channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } - Channels->SendChannelData(std::move(channelData)); + Channels->SendChannelData(std::move(channelData), true); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index 48e591d875e..d60aa2f81e9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -65,7 +65,7 @@ TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& tx for (auto& channel : task.GetOutputs(i).GetChannels()) { TOutputChannelState outputChannel; outputChannel.ChannelId = channel.GetId(); - outputChannel.PeerState.PeerFreeSpace = channelBufferSize; + outputChannel.PeerState.ActualizeFreeSpace(channelBufferSize); if (channel.GetDstEndpoint().HasActorId()) { outputChannel.Peer = ActorIdFromProto(channel.GetDstEndpoint().GetActorId()); @@ -205,22 +205,13 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr& // remove all messages with seqNo <= ackSeqNo auto it = outputChannel.InFlight.begin(); while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) { - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightBytes >= it->second.Data.PayloadSize()); - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightRows >= it->second.Data.RowCount()); - Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightCount >= 1); - - outputChannel.PeerState.InFlightBytes -= it->second.Data.PayloadSize(); - outputChannel.PeerState.InFlightRows -= it->second.Data.RowCount(); - outputChannel.PeerState.InFlightCount -= 1; + outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount()); it = outputChannel.InFlight.erase(it); } - outputChannel.PeerState.PrevPeerFreeSpace = outputChannel.PeerState.PeerFreeSpace; - outputChannel.PeerState.PeerFreeSpace = record.GetFreeSpace(); + outputChannel.PeerState.ActualizeFreeSpace(record.GetFreeSpace()); - LOG_T("PeerState, freeSpace: " << outputChannel.PeerState.PeerFreeSpace - << ", inflight bytes: " << outputChannel.PeerState.InFlightBytes - << ", inflight count: " << outputChannel.PeerState.InFlightCount + LOG_T("PeerState, peerState:(" << outputChannel.PeerState.DebugString() << ")" << ", sentSeqNo: " << outputChannel.LastSentSeqNo << ", ackSeqNo: " << record.GetSeqNo()); @@ -560,8 +551,13 @@ void TDqComputeActorChannels::SetOutputChannelPeer(ui64 channelId, const TActorI outputChannel.Peer = peer; } -bool TDqComputeActorChannels::CanSendChannelData(ui64 channelId) { - TOutputChannelState& outputChannel = OutCh(channelId); +bool TDqComputeActorChannels::HasFreeMemoryInChannel(const ui64 channelId) const { + const TOutputChannelState& outputChannel = OutCh(channelId); + return outputChannel.PeerState.HasFreeMemory(); +} + +bool TDqComputeActorChannels::CanSendChannelData(const ui64 channelId) const { + const TOutputChannelState& outputChannel = OutCh(channelId); return outputChannel.Peer && (!outputChannel.Finished || SupportCheckpoints) && !outputChannel.RetryState; } @@ -570,16 +566,16 @@ bool TDqComputeActorChannels::ShouldSkipData(ui64 channelId) { return outputChannel.Finished && !SupportCheckpoints; } -void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) { +void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, const bool needAck) { TOutputChannelState& outputChannel = OutCh(channelData.Proto.GetChannelId()); YQL_ENSURE(!outputChannel.Finished || SupportCheckpoints); YQL_ENSURE(!outputChannel.RetryState); - ui64 seqNo = ++outputChannel.LastSentSeqNo; - ui32 chunkBytes = channelData.Proto.GetData().GetRaw().size() + channelData.Payload.size(); - ui32 chunkRows = channelData.Proto.GetData().GetRows(); - bool finished = channelData.Proto.GetFinished(); + const ui64 seqNo = ++outputChannel.LastSentSeqNo; + const ui32 chunkBytes = channelData.PayloadSize(); + const ui32 chunkRows = channelData.RowCount(); + const bool finished = channelData.Proto.GetFinished(); LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId() << ", peer: " << *outputChannel.Peer @@ -621,11 +617,10 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) { outputChannel.Finished = finished; ui32 flags = CalcMessageFlags(*outputChannel.Peer); + dataEv->Record.SetNoAck(!needAck); Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId); - outputChannel.PeerState.InFlightBytes += chunkBytes; - outputChannel.PeerState.InFlightRows += chunkRows; - outputChannel.PeerState.InFlightCount += 1; + outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows); } bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) { @@ -801,7 +796,13 @@ TDqComputeActorChannels::TInputChannelState& TDqComputeActorChannels::InCh(ui64 return *ch; } -TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(ui64 channelId) { +const TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) const { + auto ch = OutputChannelsMap.FindPtr(channelId); + YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); + return *ch; +} + +TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) { auto ch = OutputChannelsMap.FindPtr(channelId); YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); return *ch; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h index cade7d4fb70..c878c228d12 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h @@ -7,17 +7,59 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/log.h> +#include <util/generic/noncopyable.h> namespace NYql::NDq { class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels> { public: - struct TPeerState { + struct TPeerState: NNonCopyable::TMoveOnly { + private: + static const ui32 InterconnectHeadersSize = 96; + i64 InFlightBytes = 0; + i64 InFlightRows = 0; + i32 InFlightCount = 0; i64 PeerFreeSpace = 0; - ui64 InFlightBytes = 0; - ui64 InFlightRows = 0; - ui32 InFlightCount = 0; - i64 PrevPeerFreeSpace = 0; + public: + i64 GetFreeMemory() const { + return PeerFreeSpace - InFlightBytes; + } + + bool NeedAck() const { + return (InFlightCount % 16 == 0) || !HasFreeMemory(); + } + + TString DebugString() const { + return TStringBuilder() << + "freeSpace:" << PeerFreeSpace << ";" << + "inFlightBytes:" << InFlightBytes << ";" << + "inFlightCount:" << InFlightCount << ";" + ; + } + + bool HasFreeMemory() const { + return PeerFreeSpace >= InFlightBytes; + } + + void ActualizeFreeSpace(const i64 actual) { + PeerFreeSpace = actual; + } + + void AddInFlight(const ui64 bytes, const ui64 rows) { + InFlightBytes += bytes + InterconnectHeadersSize; + InFlightRows += rows; + InFlightCount += 1; + } + + void RemoveInFlight(const ui64 bytes, const ui64 rows) { + InFlightBytes -= (bytes + InterconnectHeadersSize); + Y_VERIFY(InFlightBytes >= 0); + InFlightRows -= rows; + Y_VERIFY(InFlightRows >= 0); + InFlightCount -= 1; + Y_VERIFY(InFlightCount >= 0); + } }; struct ICallbacks { @@ -75,8 +117,9 @@ public: void SetCheckpointsSupport(); // Finished channels will be polled for checkpoints. void SetInputChannelPeer(ui64 channelId, const NActors::TActorId& peer); void SetOutputChannelPeer(ui64 channelId, const NActors::TActorId& peer); - bool CanSendChannelData(ui64 channelId); - void SendChannelData(TChannelDataOOB&& channelData); + bool CanSendChannelData(const ui64 channelId) const; + bool HasFreeMemoryInChannel(const ui64 channelId) const; + void SendChannelData(TChannelDataOOB&& channelData, const bool needAck); void SendChannelDataAck(i64 channelId, i64 freeSpace); bool PollChannel(ui64 channelId, i64 freeSpace); bool CheckInFlight(const TString& prefix); @@ -179,6 +222,7 @@ private: ui32 CalcMessageFlags(const NActors::TActorId& peer); TInputChannelState& InCh(ui64 channelId); TOutputChannelState& OutCh(ui64 channelId); + const TOutputChannelState& OutCh(const ui64 channelId) const; private: const NActors::TActorId Owner; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index ef64de90857..274e2c03647 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -350,7 +350,7 @@ protected: // Send checkpoints to output channels. ProcessOutputsImpl(ERunStatus::Finished); - return true; // returns true, when channels were handled syncronously + return true; // returns true, when channels were handled synchronously } void ProcessOutputsImpl(ERunStatus status) { @@ -379,8 +379,7 @@ protected: if (!outputChannel.Finished || Checkpoints) { if (Channels->CanSendChannelData(channelId)) { - auto peerState = Channels->GetOutputChannelInFlightState(channelId); - DrainOutputChannel(outputChannel, peerState); + DrainOutputChannel(outputChannel); } else { ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; } @@ -1017,6 +1016,87 @@ protected: bool Changed = false; }; TMaybe<TAsyncData> AsyncData; + + class TDrainedChannelMessage { + private: + TDqSerializedBatch Data; + NDqProto::TWatermark Watermark; + NDqProto::TCheckpoint Checkpoint; + + ui32 DataSize = 0; + ui32 WatermarkSize = 0; + ui32 CheckpointSize = 0; + + bool HasData = false; + bool HasWatermark = false; + bool HasCheckpoint = false; + bool Finished = false; + public: + const NDqProto::TWatermark* GetWatermarkOptional() const { + return HasWatermark ? &Watermark : nullptr; + } + const NDqProto::TCheckpoint* GetCheckpointOptional() const { + return HasCheckpoint ? &Checkpoint : nullptr; + } + + TChannelDataOOB BuildChannelData(const ui64 channelId) { + TChannelDataOOB channelData; + channelData.Proto.SetChannelId(channelId); + channelData.Proto.SetFinished(Finished); + if (HasData) { + channelData.Proto.MutableData()->Swap(&Data.Proto); + channelData.Payload = std::move(Data.Payload); + } + if (HasWatermark) { + channelData.Proto.MutableWatermark()->Swap(&Watermark); + } + if (HasCheckpoint) { + channelData.Proto.MutableCheckpoint()->Swap(&Checkpoint); + } + + Y_VERIFY(HasData || HasWatermark || HasCheckpoint || Finished); + return channelData; + } + + bool ReadData(const TOutputChannelInfo& outputChannel) { + auto channel = outputChannel.Channel; + + HasData = channel->Pop(Data); + HasWatermark = channel->Pop(Watermark); + HasCheckpoint = channel->Pop(Checkpoint); + Finished = !outputChannel.Finished && channel->IsFinished(); + + if (!HasData && !HasWatermark && !HasCheckpoint && !Finished) { + return false; + } + + DataSize = Data.Size(); + WatermarkSize = Watermark.ByteSize(); + CheckpointSize = Checkpoint.ByteSize(); + + return true; + } + }; + + std::vector<TDrainedChannelMessage> DrainChannel(const ui32 countLimit) { + std::vector<TDrainedChannelMessage> result; + if (Finished) { + Y_VERIFY(Channel->IsFinished()); + return result; + } + result.reserve(countLimit); + for (ui32 i = 0; i < countLimit && !Finished; ++i) { + TDrainedChannelMessage message; + if (!message.ReadData(*this)) { + break; + } + result.emplace_back(std::move(message)); + if (Channel->IsFinished()) { + Finished = true; + } + } + return result; + } }; struct TAsyncOutputInfoBase { @@ -1265,9 +1345,9 @@ protected: protected: - void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) { + void UpdateBlocked(TOutputChannelInfo& outputChannel, const bool blocked) { if (Y_UNLIKELY(outputChannel.Stats)) { - if (toSend <= 0) { + if (blocked) { outputChannel.Stats->BlockedByCapacity++; if (!outputChannel.Stats->StartBlockedTime) { outputChannel.Stats->StartBlockedTime = TInstant::Now(); @@ -1287,94 +1367,54 @@ private: return MemoryQuota->GetProfileStats(); } - virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) { + virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel) { YQL_ENSURE(!outputChannel.Finished || Checkpoints); const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.Channel->GetChannelId(); - const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - - const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; - CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer - << ", peerFreeSpace: " << peerState.PeerFreeSpace - << ", inFlightBytes: " << peerState.InFlightBytes - << ", inFlightRows: " << peerState.InFlightRows - << ", inFlightCount: " << peerState.InFlightCount - << ", allowedOvercommit: " << allowedOvercommit - << ", toSend: " << toSend << ", finished: " << outputChannel.Channel->IsFinished()); ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - UpdateBlocked(outputChannel, toSend); + UpdateBlocked(outputChannel, !Channels->HasFreeMemoryInChannel(channelId)); - i64 remains = toSend; - while (remains > 0 && (!outputChannel.Finished || Checkpoints)) { - ui32 sent = this->SendChannelDataChunk(outputChannel); - if (sent == 0) { + ui32 sentChunks = 0; + while ((!outputChannel.Finished || Checkpoints) && + Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) + { + const static ui32 drainPackSize = 16; + std::vector<typename TOutputChannelInfo::TDrainedChannelMessage> channelData = outputChannel.DrainChannel(drainPackSize); + ui32 idx = 0; + for (auto&& i : channelData) { + if (auto* w = i.GetWatermarkOptional()) { + CA_LOG_I("Resume inputs by watermark"); + // This is excessive, inputs should be resumed after async CA received response with watermark from task runner. + // But, let it be here, it's better to have the same code as in checkpoints + ResumeInputsByWatermark(TInstant::MicroSeconds(w->GetTimestampUs())); + } + if (i.GetCheckpointOptional()) { + CA_LOG_I("Resume inputs by checkpoint"); + ResumeInputsByCheckpoint(); + } + + Channels->SendChannelData(i.BuildChannelData(outputChannel.ChannelId), ++idx == channelData.size()); + ++sentChunks; + } + if (drainPackSize != channelData.size()) { + if (!outputChannel.Finished) { + CA_LOG_T("output channelId: " << outputChannel.ChannelId << ", nothing to send and is not finished"); + } break; } - remains -= sent; } ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend; - } - - ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) { - auto channel = outputChannel.Channel; - - TDqSerializedBatch data; - NDqProto::TWatermark watermark; - NDqProto::TCheckpoint checkpoint; - - bool hasData = channel->Pop(data); - bool hasWatermark = channel->Pop(watermark); - bool hasCheckpoint = channel->Pop(checkpoint); - if (!hasData && !hasWatermark && !hasCheckpoint) { - if (!channel->IsFinished()) { - CA_LOG_T("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished"); - return 0; // channel is empty and not finished yet - } - } - const bool wasFinished = outputChannel.Finished; - outputChannel.Finished = channel->IsFinished(); - const bool becameFinished = !wasFinished && outputChannel.Finished; - - ui32 dataSize = data.Size(); - ui32 watermarkSize = watermark.ByteSize(); - ui32 checkpointSize = checkpoint.ByteSize(); - - TChannelDataOOB channelData; - channelData.Proto.SetChannelId(channel->GetChannelId()); - channelData.Proto.SetFinished(outputChannel.Finished); - if (hasData) { - channelData.Proto.MutableData()->Swap(&data.Proto); - channelData.Payload = std::move(data.Payload); - } - if (hasWatermark) { - channelData.Proto.MutableWatermark()->Swap(&watermark); - CA_LOG_I("Resume inputs by watermark"); - // This is excessive, inputs should be resumed after async CA received response with watermark from task runner. - // But, let it be here, it's better to have the same code as in checkpoints - ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs())); - } - if (hasCheckpoint) { - channelData.Proto.MutableCheckpoint()->Swap(&checkpoint); - CA_LOG_I("Resume inputs by checkpoint"); - ResumeInputsByCheckpoint(); - } - - if (hasData || hasWatermark || hasCheckpoint || becameFinished) { - Channels->SendChannelData(std::move(channelData)); - return dataSize + watermarkSize + checkpointSize; - } - return 0; + ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks; } virtual void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) { |