aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-24 19:19:16 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-24 19:19:16 +0300
commit86488464a705e3512cdba41ed37b203d84ba17b6 (patch)
treebd0e15c77c75008fedf5fc7a33e68d0b5ed4a949
parent25ffc448423b8f6ab955a036ec8f15a206a4ba93 (diff)
downloadydb-86488464a705e3512cdba41ed37b203d84ba17b6.tar.gz
KIKIMR-18556,KIKIMR-18783: reuse 11963924,11955437
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp36
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp49
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h58
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h188
4 files changed, 205 insertions, 126 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 53494e31c36..938f5f3ba32 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -194,7 +194,7 @@ private:
return TaskRunnerStats.GetInputTransformStats(inputIdx);
}
- void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override {
+ void DrainOutputChannel(TOutputChannelInfo& outputChannel) override {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
if (outputChannel.PopStarted) {
@@ -204,37 +204,29 @@ private:
const bool wasFinished = outputChannel.Finished;
auto channelId = outputChannel.ChannelId;
- const ui32 allowedOvercommit = AllowedChannelsOvercommit();
-
- const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes;
+ const auto& peerState = Channels->GetOutputChannelInFlightState(channelId);
CA_LOG_T("About to drain channelId: " << channelId
<< ", hasPeer: " << outputChannel.HasPeer
- << ", peerFreeSpace: " << peerState.PeerFreeSpace
- << ", inFlightBytes: " << peerState.InFlightBytes
- << ", inFlightRows: " << peerState.InFlightRows
- << ", inFlightCount: " << peerState.InFlightCount
- << ", allowedOvercommit: " << allowedOvercommit
- << ", toSend: " << toSend
+ << ", peerState:(" << peerState.DebugString() << ")"
// << ", finished: " << outputChannel.Channel->IsFinished());
);
outputChannel.PopStarted = true;
+ const bool hasFreeMemory = peerState.HasFreeMemory();
+ UpdateBlocked(outputChannel, !hasFreeMemory);
ProcessOutputsState.Inflight++;
- UpdateBlocked(outputChannel, toSend);
- if (toSend <= 0) {
+ if (!hasFreeMemory) {
CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId
- << ". To send: " << toSend
- << ". Free space: " << peerState.PeerFreeSpace
- << ". Inflight: " << peerState.InFlightBytes
- << ". Allowed overcommit: " << allowedOvercommit);
+ << ", peerState:(" << peerState.DebugString() << ")"
+ );
auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId);
Y_VERIFY(!ev->Finished);
Send(SelfId(), std::move(ev)); // try again, ev.Finished == false
return;
}
- Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend));
+ Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, peerState.GetFreeMemory()));
}
void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override {
@@ -547,8 +539,7 @@ private:
auto& source = it->second;
source.PushStarted = false;
source.FreeSpace = ev->Get()->FreeSpaceLeft;
- ProcessSourcesState.Inflight--;
- if (ProcessSourcesState.Inflight == 0) {
+ if (--ProcessSourcesState.Inflight == 0) {
CA_LOG_T("Send TEvContinueRun on OnAsyncInputPushFinished");
AskContinueRun(Nothing(), false);
}
@@ -584,6 +575,9 @@ private:
if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution.
return false;
}
+ if (!shouldSkipData && !Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) {
+ return false;
+ }
auto& asyncData = *outputChannel.AsyncData;
outputChannel.Finished = asyncData.Finished || shouldSkipData;
@@ -628,7 +622,7 @@ private:
if (lastChunk && asyncData.Checkpoint.Defined()) {
channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
- Channels->SendChannelData(std::move(channelData));
+ Channels->SendChannelData(std::move(channelData), i + 1 == asyncData.Data.size());
}
if (asyncData.Data.empty() && asyncData.Changed) {
TChannelDataOOB channelData;
@@ -640,7 +634,7 @@ private:
if (asyncData.Checkpoint.Defined()) {
channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
- Channels->SendChannelData(std::move(channelData));
+ Channels->SendChannelData(std::move(channelData), true);
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index 48e591d875e..d60aa2f81e9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -65,7 +65,7 @@ TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& tx
for (auto& channel : task.GetOutputs(i).GetChannels()) {
TOutputChannelState outputChannel;
outputChannel.ChannelId = channel.GetId();
- outputChannel.PeerState.PeerFreeSpace = channelBufferSize;
+ outputChannel.PeerState.ActualizeFreeSpace(channelBufferSize);
if (channel.GetDstEndpoint().HasActorId()) {
outputChannel.Peer = ActorIdFromProto(channel.GetDstEndpoint().GetActorId());
@@ -205,22 +205,13 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
// remove all messages with seqNo <= ackSeqNo
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) {
- Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightBytes >= it->second.Data.PayloadSize());
- Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightRows >= it->second.Data.RowCount());
- Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightCount >= 1);
-
- outputChannel.PeerState.InFlightBytes -= it->second.Data.PayloadSize();
- outputChannel.PeerState.InFlightRows -= it->second.Data.RowCount();
- outputChannel.PeerState.InFlightCount -= 1;
+ outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
it = outputChannel.InFlight.erase(it);
}
- outputChannel.PeerState.PrevPeerFreeSpace = outputChannel.PeerState.PeerFreeSpace;
- outputChannel.PeerState.PeerFreeSpace = record.GetFreeSpace();
+ outputChannel.PeerState.ActualizeFreeSpace(record.GetFreeSpace());
- LOG_T("PeerState, freeSpace: " << outputChannel.PeerState.PeerFreeSpace
- << ", inflight bytes: " << outputChannel.PeerState.InFlightBytes
- << ", inflight count: " << outputChannel.PeerState.InFlightCount
+ LOG_T("PeerState, peerState:(" << outputChannel.PeerState.DebugString() << ")"
<< ", sentSeqNo: " << outputChannel.LastSentSeqNo
<< ", ackSeqNo: " << record.GetSeqNo());
@@ -560,8 +551,13 @@ void TDqComputeActorChannels::SetOutputChannelPeer(ui64 channelId, const TActorI
outputChannel.Peer = peer;
}
-bool TDqComputeActorChannels::CanSendChannelData(ui64 channelId) {
- TOutputChannelState& outputChannel = OutCh(channelId);
+bool TDqComputeActorChannels::HasFreeMemoryInChannel(const ui64 channelId) const {
+ const TOutputChannelState& outputChannel = OutCh(channelId);
+ return outputChannel.PeerState.HasFreeMemory();
+}
+
+bool TDqComputeActorChannels::CanSendChannelData(const ui64 channelId) const {
+ const TOutputChannelState& outputChannel = OutCh(channelId);
return outputChannel.Peer && (!outputChannel.Finished || SupportCheckpoints) && !outputChannel.RetryState;
}
@@ -570,16 +566,16 @@ bool TDqComputeActorChannels::ShouldSkipData(ui64 channelId) {
return outputChannel.Finished && !SupportCheckpoints;
}
-void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) {
+void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, const bool needAck) {
TOutputChannelState& outputChannel = OutCh(channelData.Proto.GetChannelId());
YQL_ENSURE(!outputChannel.Finished || SupportCheckpoints);
YQL_ENSURE(!outputChannel.RetryState);
- ui64 seqNo = ++outputChannel.LastSentSeqNo;
- ui32 chunkBytes = channelData.Proto.GetData().GetRaw().size() + channelData.Payload.size();
- ui32 chunkRows = channelData.Proto.GetData().GetRows();
- bool finished = channelData.Proto.GetFinished();
+ const ui64 seqNo = ++outputChannel.LastSentSeqNo;
+ const ui32 chunkBytes = channelData.PayloadSize();
+ const ui32 chunkRows = channelData.RowCount();
+ const bool finished = channelData.Proto.GetFinished();
LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId()
<< ", peer: " << *outputChannel.Peer
@@ -621,11 +617,10 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) {
outputChannel.Finished = finished;
ui32 flags = CalcMessageFlags(*outputChannel.Peer);
+ dataEv->Record.SetNoAck(!needAck);
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);
- outputChannel.PeerState.InFlightBytes += chunkBytes;
- outputChannel.PeerState.InFlightRows += chunkRows;
- outputChannel.PeerState.InFlightCount += 1;
+ outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows);
}
bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) {
@@ -801,7 +796,13 @@ TDqComputeActorChannels::TInputChannelState& TDqComputeActorChannels::InCh(ui64
return *ch;
}
-TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(ui64 channelId) {
+const TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) const {
+ auto ch = OutputChannelsMap.FindPtr(channelId);
+ YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId);
+ return *ch;
+}
+
+TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) {
auto ch = OutputChannelsMap.FindPtr(channelId);
YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId);
return *ch;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
index cade7d4fb70..c878c228d12 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
@@ -7,17 +7,59 @@
#include <ydb/library/yql/dq/actors/dq.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/log.h>
+#include <util/generic/noncopyable.h>
namespace NYql::NDq {
class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels> {
public:
- struct TPeerState {
+ struct TPeerState: NNonCopyable::TMoveOnly {
+ private:
+ static const ui32 InterconnectHeadersSize = 96;
+ i64 InFlightBytes = 0;
+ i64 InFlightRows = 0;
+ i32 InFlightCount = 0;
i64 PeerFreeSpace = 0;
- ui64 InFlightBytes = 0;
- ui64 InFlightRows = 0;
- ui32 InFlightCount = 0;
- i64 PrevPeerFreeSpace = 0;
+ public:
+ i64 GetFreeMemory() const {
+ return PeerFreeSpace - InFlightBytes;
+ }
+
+ bool NeedAck() const {
+ return (InFlightCount % 16 == 0) || !HasFreeMemory();
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() <<
+ "freeSpace:" << PeerFreeSpace << ";" <<
+ "inFlightBytes:" << InFlightBytes << ";" <<
+ "inFlightCount:" << InFlightCount << ";"
+ ;
+ }
+
+ bool HasFreeMemory() const {
+ return PeerFreeSpace >= InFlightBytes;
+ }
+
+ void ActualizeFreeSpace(const i64 actual) {
+ PeerFreeSpace = actual;
+ }
+
+ void AddInFlight(const ui64 bytes, const ui64 rows) {
+ InFlightBytes += bytes + InterconnectHeadersSize;
+ InFlightRows += rows;
+ InFlightCount += 1;
+ }
+
+ void RemoveInFlight(const ui64 bytes, const ui64 rows) {
+ InFlightBytes -= (bytes + InterconnectHeadersSize);
+ Y_VERIFY(InFlightBytes >= 0);
+ InFlightRows -= rows;
+ Y_VERIFY(InFlightRows >= 0);
+ InFlightCount -= 1;
+ Y_VERIFY(InFlightCount >= 0);
+ }
};
struct ICallbacks {
@@ -75,8 +117,9 @@ public:
void SetCheckpointsSupport(); // Finished channels will be polled for checkpoints.
void SetInputChannelPeer(ui64 channelId, const NActors::TActorId& peer);
void SetOutputChannelPeer(ui64 channelId, const NActors::TActorId& peer);
- bool CanSendChannelData(ui64 channelId);
- void SendChannelData(TChannelDataOOB&& channelData);
+ bool CanSendChannelData(const ui64 channelId) const;
+ bool HasFreeMemoryInChannel(const ui64 channelId) const;
+ void SendChannelData(TChannelDataOOB&& channelData, const bool needAck);
void SendChannelDataAck(i64 channelId, i64 freeSpace);
bool PollChannel(ui64 channelId, i64 freeSpace);
bool CheckInFlight(const TString& prefix);
@@ -179,6 +222,7 @@ private:
ui32 CalcMessageFlags(const NActors::TActorId& peer);
TInputChannelState& InCh(ui64 channelId);
TOutputChannelState& OutCh(ui64 channelId);
+ const TOutputChannelState& OutCh(const ui64 channelId) const;
private:
const NActors::TActorId Owner;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index ef64de90857..274e2c03647 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -350,7 +350,7 @@ protected:
// Send checkpoints to output channels.
ProcessOutputsImpl(ERunStatus::Finished);
- return true; // returns true, when channels were handled syncronously
+ return true; // returns true, when channels were handled synchronously
}
void ProcessOutputsImpl(ERunStatus status) {
@@ -379,8 +379,7 @@ protected:
if (!outputChannel.Finished || Checkpoints) {
if (Channels->CanSendChannelData(channelId)) {
- auto peerState = Channels->GetOutputChannelInFlightState(channelId);
- DrainOutputChannel(outputChannel, peerState);
+ DrainOutputChannel(outputChannel);
} else {
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
}
@@ -1017,6 +1016,87 @@ protected:
bool Changed = false;
};
TMaybe<TAsyncData> AsyncData;
+
+ class TDrainedChannelMessage {
+ private:
+ TDqSerializedBatch Data;
+ NDqProto::TWatermark Watermark;
+ NDqProto::TCheckpoint Checkpoint;
+
+ ui32 DataSize = 0;
+ ui32 WatermarkSize = 0;
+ ui32 CheckpointSize = 0;
+
+ bool HasData = false;
+ bool HasWatermark = false;
+ bool HasCheckpoint = false;
+ bool Finished = false;
+ public:
+ const NDqProto::TWatermark* GetWatermarkOptional() const {
+ return HasWatermark ? &Watermark : nullptr;
+ }
+ const NDqProto::TCheckpoint* GetCheckpointOptional() const {
+ return HasCheckpoint ? &Checkpoint : nullptr;
+ }
+
+ TChannelDataOOB BuildChannelData(const ui64 channelId) {
+ TChannelDataOOB channelData;
+ channelData.Proto.SetChannelId(channelId);
+ channelData.Proto.SetFinished(Finished);
+ if (HasData) {
+ channelData.Proto.MutableData()->Swap(&Data.Proto);
+ channelData.Payload = std::move(Data.Payload);
+ }
+ if (HasWatermark) {
+ channelData.Proto.MutableWatermark()->Swap(&Watermark);
+ }
+ if (HasCheckpoint) {
+ channelData.Proto.MutableCheckpoint()->Swap(&Checkpoint);
+ }
+
+ Y_VERIFY(HasData || HasWatermark || HasCheckpoint || Finished);
+ return channelData;
+ }
+
+ bool ReadData(const TOutputChannelInfo& outputChannel) {
+ auto channel = outputChannel.Channel;
+
+ HasData = channel->Pop(Data);
+ HasWatermark = channel->Pop(Watermark);
+ HasCheckpoint = channel->Pop(Checkpoint);
+ Finished = !outputChannel.Finished && channel->IsFinished();
+
+ if (!HasData && !HasWatermark && !HasCheckpoint && !Finished) {
+ return false;
+ }
+
+ DataSize = Data.Size();
+ WatermarkSize = Watermark.ByteSize();
+ CheckpointSize = Checkpoint.ByteSize();
+
+ return true;
+ }
+ };
+
+ std::vector<TDrainedChannelMessage> DrainChannel(const ui32 countLimit) {
+ std::vector<TDrainedChannelMessage> result;
+ if (Finished) {
+ Y_VERIFY(Channel->IsFinished());
+ return result;
+ }
+ result.reserve(countLimit);
+ for (ui32 i = 0; i < countLimit && !Finished; ++i) {
+ TDrainedChannelMessage message;
+ if (!message.ReadData(*this)) {
+ break;
+ }
+ result.emplace_back(std::move(message));
+ if (Channel->IsFinished()) {
+ Finished = true;
+ }
+ }
+ return result;
+ }
};
struct TAsyncOutputInfoBase {
@@ -1265,9 +1345,9 @@ protected:
protected:
- void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) {
+ void UpdateBlocked(TOutputChannelInfo& outputChannel, const bool blocked) {
if (Y_UNLIKELY(outputChannel.Stats)) {
- if (toSend <= 0) {
+ if (blocked) {
outputChannel.Stats->BlockedByCapacity++;
if (!outputChannel.Stats->StartBlockedTime) {
outputChannel.Stats->StartBlockedTime = TInstant::Now();
@@ -1287,94 +1367,54 @@ private:
return MemoryQuota->GetProfileStats();
}
- virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) {
+ virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel) {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
const bool wasFinished = outputChannel.Finished;
auto channelId = outputChannel.Channel->GetChannelId();
- const ui32 allowedOvercommit = AllowedChannelsOvercommit();
-
- const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes;
-
CA_LOG_T("About to drain channelId: " << channelId
<< ", hasPeer: " << outputChannel.HasPeer
- << ", peerFreeSpace: " << peerState.PeerFreeSpace
- << ", inFlightBytes: " << peerState.InFlightBytes
- << ", inFlightRows: " << peerState.InFlightRows
- << ", inFlightCount: " << peerState.InFlightCount
- << ", allowedOvercommit: " << allowedOvercommit
- << ", toSend: " << toSend
<< ", finished: " << outputChannel.Channel->IsFinished());
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
- UpdateBlocked(outputChannel, toSend);
+ UpdateBlocked(outputChannel, !Channels->HasFreeMemoryInChannel(channelId));
- i64 remains = toSend;
- while (remains > 0 && (!outputChannel.Finished || Checkpoints)) {
- ui32 sent = this->SendChannelDataChunk(outputChannel);
- if (sent == 0) {
+ ui32 sentChunks = 0;
+ while ((!outputChannel.Finished || Checkpoints) &&
+ Channels->HasFreeMemoryInChannel(outputChannel.ChannelId))
+ {
+ const static ui32 drainPackSize = 16;
+ std::vector<typename TOutputChannelInfo::TDrainedChannelMessage> channelData = outputChannel.DrainChannel(drainPackSize);
+ ui32 idx = 0;
+ for (auto&& i : channelData) {
+ if (auto* w = i.GetWatermarkOptional()) {
+ CA_LOG_I("Resume inputs by watermark");
+ // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
+ // But, let it be here, it's better to have the same code as in checkpoints
+ ResumeInputsByWatermark(TInstant::MicroSeconds(w->GetTimestampUs()));
+ }
+ if (i.GetCheckpointOptional()) {
+ CA_LOG_I("Resume inputs by checkpoint");
+ ResumeInputsByCheckpoint();
+ }
+
+ Channels->SendChannelData(i.BuildChannelData(outputChannel.ChannelId), ++idx == channelData.size());
+ ++sentChunks;
+ }
+ if (drainPackSize != channelData.size()) {
+ if (!outputChannel.Finished) {
+ CA_LOG_T("output channelId: " << outputChannel.ChannelId << ", nothing to send and is not finished");
+ }
break;
}
- remains -= sent;
}
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
- ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend;
- }
-
- ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) {
- auto channel = outputChannel.Channel;
-
- TDqSerializedBatch data;
- NDqProto::TWatermark watermark;
- NDqProto::TCheckpoint checkpoint;
-
- bool hasData = channel->Pop(data);
- bool hasWatermark = channel->Pop(watermark);
- bool hasCheckpoint = channel->Pop(checkpoint);
- if (!hasData && !hasWatermark && !hasCheckpoint) {
- if (!channel->IsFinished()) {
- CA_LOG_T("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished");
- return 0; // channel is empty and not finished yet
- }
- }
- const bool wasFinished = outputChannel.Finished;
- outputChannel.Finished = channel->IsFinished();
- const bool becameFinished = !wasFinished && outputChannel.Finished;
-
- ui32 dataSize = data.Size();
- ui32 watermarkSize = watermark.ByteSize();
- ui32 checkpointSize = checkpoint.ByteSize();
-
- TChannelDataOOB channelData;
- channelData.Proto.SetChannelId(channel->GetChannelId());
- channelData.Proto.SetFinished(outputChannel.Finished);
- if (hasData) {
- channelData.Proto.MutableData()->Swap(&data.Proto);
- channelData.Payload = std::move(data.Payload);
- }
- if (hasWatermark) {
- channelData.Proto.MutableWatermark()->Swap(&watermark);
- CA_LOG_I("Resume inputs by watermark");
- // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
- // But, let it be here, it's better to have the same code as in checkpoints
- ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs()));
- }
- if (hasCheckpoint) {
- channelData.Proto.MutableCheckpoint()->Swap(&checkpoint);
- CA_LOG_I("Resume inputs by checkpoint");
- ResumeInputsByCheckpoint();
- }
-
- if (hasData || hasWatermark || hasCheckpoint || becameFinished) {
- Channels->SendChannelData(std::move(channelData));
- return dataSize + watermarkSize + checkpointSize;
- }
- return 0;
+ ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks;
}
virtual void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) {