diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-24 14:38:41 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-24 14:38:41 +0300 |
commit | 5d8ffcc517280cf9d851da16c811b2052e16df04 (patch) | |
tree | 7ee27f618e8c2f613126e8b4c6557a222db976b3 | |
parent | d335551d092a04a48aa7ca00f95e4bc5cb066816 (diff) | |
download | ydb-5d8ffcc517280cf9d851da16c811b2052e16df04.tar.gz |
temporary revert. fix tests
4 files changed, 125 insertions, 205 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index a6c204d4e8b..53494e31c36 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -194,7 +194,7 @@ private: return TaskRunnerStats.GetInputTransformStats(inputIdx); } - void DrainOutputChannel(TOutputChannelInfo& outputChannel) override { + void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override { YQL_ENSURE(!outputChannel.Finished || Checkpoints); if (outputChannel.PopStarted) { @@ -204,29 +204,37 @@ private: const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.ChannelId; - const auto& peerState = Channels->GetOutputChannelInFlightState(channelId); + const ui32 allowedOvercommit = AllowedChannelsOvercommit(); + + const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer - << ", peerState:(" << peerState.DebugString() << ")" + << ", peerFreeSpace: " << peerState.PeerFreeSpace + << ", inFlightBytes: " << peerState.InFlightBytes + << ", inFlightRows: " << peerState.InFlightRows + << ", inFlightCount: " << peerState.InFlightCount + << ", allowedOvercommit: " << allowedOvercommit + << ", toSend: " << toSend // << ", finished: " << outputChannel.Channel->IsFinished()); ); outputChannel.PopStarted = true; - const bool hasFreeMemory = peerState.HasFreeMemory(); - UpdateBlocked(outputChannel, !hasFreeMemory); - if (!hasFreeMemory) { + ProcessOutputsState.Inflight++; + UpdateBlocked(outputChannel, toSend); + if (toSend <= 0) { CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId - << ", peerState:(" << peerState.DebugString() << ")" - ); + << ". To send: " << toSend + << ". Free space: " << peerState.PeerFreeSpace + << ". Inflight: " << peerState.InFlightBytes + << ". Allowed overcommit: " << allowedOvercommit); auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId); Y_VERIFY(!ev->Finished); Send(SelfId(), std::move(ev)); // try again, ev.Finished == false return; } - ProcessOutputsState.Inflight++; - Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, peerState.GetFreeMemory())); + Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend)); } void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override { @@ -576,9 +584,6 @@ private: if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution. return false; } - if (!Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) { - return false; - } auto& asyncData = *outputChannel.AsyncData; outputChannel.Finished = asyncData.Finished || shouldSkipData; @@ -623,7 +628,7 @@ private: if (lastChunk && asyncData.Checkpoint.Defined()) { channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } - Channels->SendChannelData(std::move(channelData), i + 1 == asyncData.Data.size()); + Channels->SendChannelData(std::move(channelData)); } if (asyncData.Data.empty() && asyncData.Changed) { TChannelDataOOB channelData; @@ -635,7 +640,7 @@ private: if (asyncData.Checkpoint.Defined()) { channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } - Channels->SendChannelData(std::move(channelData), true); + Channels->SendChannelData(std::move(channelData)); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index d60aa2f81e9..48e591d875e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -65,7 +65,7 @@ TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& tx for (auto& channel : task.GetOutputs(i).GetChannels()) { TOutputChannelState outputChannel; outputChannel.ChannelId = channel.GetId(); - outputChannel.PeerState.ActualizeFreeSpace(channelBufferSize); + outputChannel.PeerState.PeerFreeSpace = channelBufferSize; if (channel.GetDstEndpoint().HasActorId()) { outputChannel.Peer = ActorIdFromProto(channel.GetDstEndpoint().GetActorId()); @@ -205,13 +205,22 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr& // remove all messages with seqNo <= ackSeqNo auto it = outputChannel.InFlight.begin(); while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) { - outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount()); + Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightBytes >= it->second.Data.PayloadSize()); + Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightRows >= it->second.Data.RowCount()); + Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightCount >= 1); + + outputChannel.PeerState.InFlightBytes -= it->second.Data.PayloadSize(); + outputChannel.PeerState.InFlightRows -= it->second.Data.RowCount(); + outputChannel.PeerState.InFlightCount -= 1; it = outputChannel.InFlight.erase(it); } - outputChannel.PeerState.ActualizeFreeSpace(record.GetFreeSpace()); + outputChannel.PeerState.PrevPeerFreeSpace = outputChannel.PeerState.PeerFreeSpace; + outputChannel.PeerState.PeerFreeSpace = record.GetFreeSpace(); - LOG_T("PeerState, peerState:(" << outputChannel.PeerState.DebugString() << ")" + LOG_T("PeerState, freeSpace: " << outputChannel.PeerState.PeerFreeSpace + << ", inflight bytes: " << outputChannel.PeerState.InFlightBytes + << ", inflight count: " << outputChannel.PeerState.InFlightCount << ", sentSeqNo: " << outputChannel.LastSentSeqNo << ", ackSeqNo: " << record.GetSeqNo()); @@ -551,13 +560,8 @@ void TDqComputeActorChannels::SetOutputChannelPeer(ui64 channelId, const TActorI outputChannel.Peer = peer; } -bool TDqComputeActorChannels::HasFreeMemoryInChannel(const ui64 channelId) const { - const TOutputChannelState& outputChannel = OutCh(channelId); - return outputChannel.PeerState.HasFreeMemory(); -} - -bool TDqComputeActorChannels::CanSendChannelData(const ui64 channelId) const { - const TOutputChannelState& outputChannel = OutCh(channelId); +bool TDqComputeActorChannels::CanSendChannelData(ui64 channelId) { + TOutputChannelState& outputChannel = OutCh(channelId); return outputChannel.Peer && (!outputChannel.Finished || SupportCheckpoints) && !outputChannel.RetryState; } @@ -566,16 +570,16 @@ bool TDqComputeActorChannels::ShouldSkipData(ui64 channelId) { return outputChannel.Finished && !SupportCheckpoints; } -void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, const bool needAck) { +void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) { TOutputChannelState& outputChannel = OutCh(channelData.Proto.GetChannelId()); YQL_ENSURE(!outputChannel.Finished || SupportCheckpoints); YQL_ENSURE(!outputChannel.RetryState); - const ui64 seqNo = ++outputChannel.LastSentSeqNo; - const ui32 chunkBytes = channelData.PayloadSize(); - const ui32 chunkRows = channelData.RowCount(); - const bool finished = channelData.Proto.GetFinished(); + ui64 seqNo = ++outputChannel.LastSentSeqNo; + ui32 chunkBytes = channelData.Proto.GetData().GetRaw().size() + channelData.Payload.size(); + ui32 chunkRows = channelData.Proto.GetData().GetRows(); + bool finished = channelData.Proto.GetFinished(); LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId() << ", peer: " << *outputChannel.Peer @@ -617,10 +621,11 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con outputChannel.Finished = finished; ui32 flags = CalcMessageFlags(*outputChannel.Peer); - dataEv->Record.SetNoAck(!needAck); Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId); - outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows); + outputChannel.PeerState.InFlightBytes += chunkBytes; + outputChannel.PeerState.InFlightRows += chunkRows; + outputChannel.PeerState.InFlightCount += 1; } bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) { @@ -796,13 +801,7 @@ TDqComputeActorChannels::TInputChannelState& TDqComputeActorChannels::InCh(ui64 return *ch; } -const TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) const { - auto ch = OutputChannelsMap.FindPtr(channelId); - YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); - return *ch; -} - -TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) { +TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(ui64 channelId) { auto ch = OutputChannelsMap.FindPtr(channelId); YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId); return *ch; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h index c878c228d12..cade7d4fb70 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h @@ -7,59 +7,17 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <library/cpp/actors/core/interconnect.h> -#include <library/cpp/actors/core/log.h> -#include <util/generic/noncopyable.h> namespace NYql::NDq { class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels> { public: - struct TPeerState: NNonCopyable::TMoveOnly { - private: - static const ui32 InterconnectHeadersSize = 96; - i64 InFlightBytes = 0; - i64 InFlightRows = 0; - i32 InFlightCount = 0; + struct TPeerState { i64 PeerFreeSpace = 0; - public: - i64 GetFreeMemory() const { - return PeerFreeSpace - InFlightBytes; - } - - bool NeedAck() const { - return (InFlightCount % 16 == 0) || !HasFreeMemory(); - } - - TString DebugString() const { - return TStringBuilder() << - "freeSpace:" << PeerFreeSpace << ";" << - "inFlightBytes:" << InFlightBytes << ";" << - "inFlightCount:" << InFlightCount << ";" - ; - } - - bool HasFreeMemory() const { - return PeerFreeSpace >= InFlightBytes; - } - - void ActualizeFreeSpace(const i64 actual) { - PeerFreeSpace = actual; - } - - void AddInFlight(const ui64 bytes, const ui64 rows) { - InFlightBytes += bytes + InterconnectHeadersSize; - InFlightRows += rows; - InFlightCount += 1; - } - - void RemoveInFlight(const ui64 bytes, const ui64 rows) { - InFlightBytes -= (bytes + InterconnectHeadersSize); - Y_VERIFY(InFlightBytes >= 0); - InFlightRows -= rows; - Y_VERIFY(InFlightRows >= 0); - InFlightCount -= 1; - Y_VERIFY(InFlightCount >= 0); - } + ui64 InFlightBytes = 0; + ui64 InFlightRows = 0; + ui32 InFlightCount = 0; + i64 PrevPeerFreeSpace = 0; }; struct ICallbacks { @@ -117,9 +75,8 @@ public: void SetCheckpointsSupport(); // Finished channels will be polled for checkpoints. void SetInputChannelPeer(ui64 channelId, const NActors::TActorId& peer); void SetOutputChannelPeer(ui64 channelId, const NActors::TActorId& peer); - bool CanSendChannelData(const ui64 channelId) const; - bool HasFreeMemoryInChannel(const ui64 channelId) const; - void SendChannelData(TChannelDataOOB&& channelData, const bool needAck); + bool CanSendChannelData(ui64 channelId); + void SendChannelData(TChannelDataOOB&& channelData); void SendChannelDataAck(i64 channelId, i64 freeSpace); bool PollChannel(ui64 channelId, i64 freeSpace); bool CheckInFlight(const TString& prefix); @@ -222,7 +179,6 @@ private: ui32 CalcMessageFlags(const NActors::TActorId& peer); TInputChannelState& InCh(ui64 channelId); TOutputChannelState& OutCh(ui64 channelId); - const TOutputChannelState& OutCh(const ui64 channelId) const; private: const NActors::TActorId Owner; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 274e2c03647..ef64de90857 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -350,7 +350,7 @@ protected: // Send checkpoints to output channels. ProcessOutputsImpl(ERunStatus::Finished); - return true; // returns true, when channels were handled synchronously + return true; // returns true, when channels were handled syncronously } void ProcessOutputsImpl(ERunStatus status) { @@ -379,7 +379,8 @@ protected: if (!outputChannel.Finished || Checkpoints) { if (Channels->CanSendChannelData(channelId)) { - DrainOutputChannel(outputChannel); + auto peerState = Channels->GetOutputChannelInFlightState(channelId); + DrainOutputChannel(outputChannel, peerState); } else { ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; } @@ -1016,87 +1017,6 @@ protected: bool Changed = false; }; TMaybe<TAsyncData> AsyncData; - - class TDrainedChannelMessage { - private: - TDqSerializedBatch Data; - NDqProto::TWatermark Watermark; - NDqProto::TCheckpoint Checkpoint; - - ui32 DataSize = 0; - ui32 WatermarkSize = 0; - ui32 CheckpointSize = 0; - - bool HasData = false; - bool HasWatermark = false; - bool HasCheckpoint = false; - bool Finished = false; - public: - const NDqProto::TWatermark* GetWatermarkOptional() const { - return HasWatermark ? &Watermark : nullptr; - } - const NDqProto::TCheckpoint* GetCheckpointOptional() const { - return HasCheckpoint ? &Checkpoint : nullptr; - } - - TChannelDataOOB BuildChannelData(const ui64 channelId) { - TChannelDataOOB channelData; - channelData.Proto.SetChannelId(channelId); - channelData.Proto.SetFinished(Finished); - if (HasData) { - channelData.Proto.MutableData()->Swap(&Data.Proto); - channelData.Payload = std::move(Data.Payload); - } - if (HasWatermark) { - channelData.Proto.MutableWatermark()->Swap(&Watermark); - } - if (HasCheckpoint) { - channelData.Proto.MutableCheckpoint()->Swap(&Checkpoint); - } - - Y_VERIFY(HasData || HasWatermark || HasCheckpoint || Finished); - return channelData; - } - - bool ReadData(const TOutputChannelInfo& outputChannel) { - auto channel = outputChannel.Channel; - - HasData = channel->Pop(Data); - HasWatermark = channel->Pop(Watermark); - HasCheckpoint = channel->Pop(Checkpoint); - Finished = !outputChannel.Finished && channel->IsFinished(); - - if (!HasData && !HasWatermark && !HasCheckpoint && !Finished) { - return false; - } - - DataSize = Data.Size(); - WatermarkSize = Watermark.ByteSize(); - CheckpointSize = Checkpoint.ByteSize(); - - return true; - } - }; - - std::vector<TDrainedChannelMessage> DrainChannel(const ui32 countLimit) { - std::vector<TDrainedChannelMessage> result; - if (Finished) { - Y_VERIFY(Channel->IsFinished()); - return result; - } - result.reserve(countLimit); - for (ui32 i = 0; i < countLimit && !Finished; ++i) { - TDrainedChannelMessage message; - if (!message.ReadData(*this)) { - break; - } - result.emplace_back(std::move(message)); - if (Channel->IsFinished()) { - Finished = true; - } - } - return result; - } }; struct TAsyncOutputInfoBase { @@ -1345,9 +1265,9 @@ protected: protected: - void UpdateBlocked(TOutputChannelInfo& outputChannel, const bool blocked) { + void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) { if (Y_UNLIKELY(outputChannel.Stats)) { - if (blocked) { + if (toSend <= 0) { outputChannel.Stats->BlockedByCapacity++; if (!outputChannel.Stats->StartBlockedTime) { outputChannel.Stats->StartBlockedTime = TInstant::Now(); @@ -1367,54 +1287,94 @@ private: return MemoryQuota->GetProfileStats(); } - virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel) { + virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) { YQL_ENSURE(!outputChannel.Finished || Checkpoints); const bool wasFinished = outputChannel.Finished; auto channelId = outputChannel.Channel->GetChannelId(); + const ui32 allowedOvercommit = AllowedChannelsOvercommit(); + + const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes; + CA_LOG_T("About to drain channelId: " << channelId << ", hasPeer: " << outputChannel.HasPeer + << ", peerFreeSpace: " << peerState.PeerFreeSpace + << ", inFlightBytes: " << peerState.InFlightBytes + << ", inFlightRows: " << peerState.InFlightRows + << ", inFlightCount: " << peerState.InFlightCount + << ", allowedOvercommit: " << allowedOvercommit + << ", toSend: " << toSend << ", finished: " << outputChannel.Channel->IsFinished()); ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - UpdateBlocked(outputChannel, !Channels->HasFreeMemoryInChannel(channelId)); + UpdateBlocked(outputChannel, toSend); - ui32 sentChunks = 0; - while ((!outputChannel.Finished || Checkpoints) && - Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) - { - const static ui32 drainPackSize = 16; - std::vector<typename TOutputChannelInfo::TDrainedChannelMessage> channelData = outputChannel.DrainChannel(drainPackSize); - ui32 idx = 0; - for (auto&& i : channelData) { - if (auto* w = i.GetWatermarkOptional()) { - CA_LOG_I("Resume inputs by watermark"); - // This is excessive, inputs should be resumed after async CA received response with watermark from task runner. - // But, let it be here, it's better to have the same code as in checkpoints - ResumeInputsByWatermark(TInstant::MicroSeconds(w->GetTimestampUs())); - } - if (i.GetCheckpointOptional()) { - CA_LOG_I("Resume inputs by checkpoint"); - ResumeInputsByCheckpoint(); - } - - Channels->SendChannelData(i.BuildChannelData(outputChannel.ChannelId), ++idx == channelData.size()); - ++sentChunks; - } - if (drainPackSize != channelData.size()) { - if (!outputChannel.Finished) { - CA_LOG_T("output channelId: " << outputChannel.ChannelId << ", nothing to send and is not finished"); - } + i64 remains = toSend; + while (remains > 0 && (!outputChannel.Finished || Checkpoints)) { + ui32 sent = this->SendChannelDataChunk(outputChannel); + if (sent == 0) { break; } + remains -= sent; } ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks; + ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend; + } + + ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) { + auto channel = outputChannel.Channel; + + TDqSerializedBatch data; + NDqProto::TWatermark watermark; + NDqProto::TCheckpoint checkpoint; + + bool hasData = channel->Pop(data); + bool hasWatermark = channel->Pop(watermark); + bool hasCheckpoint = channel->Pop(checkpoint); + if (!hasData && !hasWatermark && !hasCheckpoint) { + if (!channel->IsFinished()) { + CA_LOG_T("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished"); + return 0; // channel is empty and not finished yet + } + } + const bool wasFinished = outputChannel.Finished; + outputChannel.Finished = channel->IsFinished(); + const bool becameFinished = !wasFinished && outputChannel.Finished; + + ui32 dataSize = data.Size(); + ui32 watermarkSize = watermark.ByteSize(); + ui32 checkpointSize = checkpoint.ByteSize(); + + TChannelDataOOB channelData; + channelData.Proto.SetChannelId(channel->GetChannelId()); + channelData.Proto.SetFinished(outputChannel.Finished); + if (hasData) { + channelData.Proto.MutableData()->Swap(&data.Proto); + channelData.Payload = std::move(data.Payload); + } + if (hasWatermark) { + channelData.Proto.MutableWatermark()->Swap(&watermark); + CA_LOG_I("Resume inputs by watermark"); + // This is excessive, inputs should be resumed after async CA received response with watermark from task runner. + // But, let it be here, it's better to have the same code as in checkpoints + ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs())); + } + if (hasCheckpoint) { + channelData.Proto.MutableCheckpoint()->Swap(&checkpoint); + CA_LOG_I("Resume inputs by checkpoint"); + ResumeInputsByCheckpoint(); + } + + if (hasData || hasWatermark || hasCheckpoint || becameFinished) { + Channels->SendChannelData(std::move(channelData)); + return dataSize + watermarkSize + checkpointSize; + } + return 0; } virtual void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) { |