aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-24 14:38:41 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-24 14:38:41 +0300
commit5d8ffcc517280cf9d851da16c811b2052e16df04 (patch)
tree7ee27f618e8c2f613126e8b4c6557a222db976b3
parentd335551d092a04a48aa7ca00f95e4bc5cb066816 (diff)
downloadydb-5d8ffcc517280cf9d851da16c811b2052e16df04.tar.gz
temporary revert. fix tests
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp35
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp49
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h58
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h188
4 files changed, 125 insertions, 205 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index a6c204d4e8b..53494e31c36 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -194,7 +194,7 @@ private:
return TaskRunnerStats.GetInputTransformStats(inputIdx);
}
- void DrainOutputChannel(TOutputChannelInfo& outputChannel) override {
+ void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
if (outputChannel.PopStarted) {
@@ -204,29 +204,37 @@ private:
const bool wasFinished = outputChannel.Finished;
auto channelId = outputChannel.ChannelId;
- const auto& peerState = Channels->GetOutputChannelInFlightState(channelId);
+ const ui32 allowedOvercommit = AllowedChannelsOvercommit();
+
+ const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes;
CA_LOG_T("About to drain channelId: " << channelId
<< ", hasPeer: " << outputChannel.HasPeer
- << ", peerState:(" << peerState.DebugString() << ")"
+ << ", peerFreeSpace: " << peerState.PeerFreeSpace
+ << ", inFlightBytes: " << peerState.InFlightBytes
+ << ", inFlightRows: " << peerState.InFlightRows
+ << ", inFlightCount: " << peerState.InFlightCount
+ << ", allowedOvercommit: " << allowedOvercommit
+ << ", toSend: " << toSend
// << ", finished: " << outputChannel.Channel->IsFinished());
);
outputChannel.PopStarted = true;
- const bool hasFreeMemory = peerState.HasFreeMemory();
- UpdateBlocked(outputChannel, !hasFreeMemory);
- if (!hasFreeMemory) {
+ ProcessOutputsState.Inflight++;
+ UpdateBlocked(outputChannel, toSend);
+ if (toSend <= 0) {
CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId
- << ", peerState:(" << peerState.DebugString() << ")"
- );
+ << ". To send: " << toSend
+ << ". Free space: " << peerState.PeerFreeSpace
+ << ". Inflight: " << peerState.InFlightBytes
+ << ". Allowed overcommit: " << allowedOvercommit);
auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId);
Y_VERIFY(!ev->Finished);
Send(SelfId(), std::move(ev)); // try again, ev.Finished == false
return;
}
- ProcessOutputsState.Inflight++;
- Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, peerState.GetFreeMemory()));
+ Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend));
}
void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override {
@@ -576,9 +584,6 @@ private:
if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution.
return false;
}
- if (!Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) {
- return false;
- }
auto& asyncData = *outputChannel.AsyncData;
outputChannel.Finished = asyncData.Finished || shouldSkipData;
@@ -623,7 +628,7 @@ private:
if (lastChunk && asyncData.Checkpoint.Defined()) {
channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
- Channels->SendChannelData(std::move(channelData), i + 1 == asyncData.Data.size());
+ Channels->SendChannelData(std::move(channelData));
}
if (asyncData.Data.empty() && asyncData.Changed) {
TChannelDataOOB channelData;
@@ -635,7 +640,7 @@ private:
if (asyncData.Checkpoint.Defined()) {
channelData.Proto.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
- Channels->SendChannelData(std::move(channelData), true);
+ Channels->SendChannelData(std::move(channelData));
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index d60aa2f81e9..48e591d875e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -65,7 +65,7 @@ TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& tx
for (auto& channel : task.GetOutputs(i).GetChannels()) {
TOutputChannelState outputChannel;
outputChannel.ChannelId = channel.GetId();
- outputChannel.PeerState.ActualizeFreeSpace(channelBufferSize);
+ outputChannel.PeerState.PeerFreeSpace = channelBufferSize;
if (channel.GetDstEndpoint().HasActorId()) {
outputChannel.Peer = ActorIdFromProto(channel.GetDstEndpoint().GetActorId());
@@ -205,13 +205,22 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
// remove all messages with seqNo <= ackSeqNo
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) {
- outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
+ Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightBytes >= it->second.Data.PayloadSize());
+ Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightRows >= it->second.Data.RowCount());
+ Y_VERIFY_DEBUG(outputChannel.PeerState.InFlightCount >= 1);
+
+ outputChannel.PeerState.InFlightBytes -= it->second.Data.PayloadSize();
+ outputChannel.PeerState.InFlightRows -= it->second.Data.RowCount();
+ outputChannel.PeerState.InFlightCount -= 1;
it = outputChannel.InFlight.erase(it);
}
- outputChannel.PeerState.ActualizeFreeSpace(record.GetFreeSpace());
+ outputChannel.PeerState.PrevPeerFreeSpace = outputChannel.PeerState.PeerFreeSpace;
+ outputChannel.PeerState.PeerFreeSpace = record.GetFreeSpace();
- LOG_T("PeerState, peerState:(" << outputChannel.PeerState.DebugString() << ")"
+ LOG_T("PeerState, freeSpace: " << outputChannel.PeerState.PeerFreeSpace
+ << ", inflight bytes: " << outputChannel.PeerState.InFlightBytes
+ << ", inflight count: " << outputChannel.PeerState.InFlightCount
<< ", sentSeqNo: " << outputChannel.LastSentSeqNo
<< ", ackSeqNo: " << record.GetSeqNo());
@@ -551,13 +560,8 @@ void TDqComputeActorChannels::SetOutputChannelPeer(ui64 channelId, const TActorI
outputChannel.Peer = peer;
}
-bool TDqComputeActorChannels::HasFreeMemoryInChannel(const ui64 channelId) const {
- const TOutputChannelState& outputChannel = OutCh(channelId);
- return outputChannel.PeerState.HasFreeMemory();
-}
-
-bool TDqComputeActorChannels::CanSendChannelData(const ui64 channelId) const {
- const TOutputChannelState& outputChannel = OutCh(channelId);
+bool TDqComputeActorChannels::CanSendChannelData(ui64 channelId) {
+ TOutputChannelState& outputChannel = OutCh(channelId);
return outputChannel.Peer && (!outputChannel.Finished || SupportCheckpoints) && !outputChannel.RetryState;
}
@@ -566,16 +570,16 @@ bool TDqComputeActorChannels::ShouldSkipData(ui64 channelId) {
return outputChannel.Finished && !SupportCheckpoints;
}
-void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, const bool needAck) {
+void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData) {
TOutputChannelState& outputChannel = OutCh(channelData.Proto.GetChannelId());
YQL_ENSURE(!outputChannel.Finished || SupportCheckpoints);
YQL_ENSURE(!outputChannel.RetryState);
- const ui64 seqNo = ++outputChannel.LastSentSeqNo;
- const ui32 chunkBytes = channelData.PayloadSize();
- const ui32 chunkRows = channelData.RowCount();
- const bool finished = channelData.Proto.GetFinished();
+ ui64 seqNo = ++outputChannel.LastSentSeqNo;
+ ui32 chunkBytes = channelData.Proto.GetData().GetRaw().size() + channelData.Payload.size();
+ ui32 chunkRows = channelData.Proto.GetData().GetRows();
+ bool finished = channelData.Proto.GetFinished();
LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId()
<< ", peer: " << *outputChannel.Peer
@@ -617,10 +621,11 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
outputChannel.Finished = finished;
ui32 flags = CalcMessageFlags(*outputChannel.Peer);
- dataEv->Record.SetNoAck(!needAck);
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);
- outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows);
+ outputChannel.PeerState.InFlightBytes += chunkBytes;
+ outputChannel.PeerState.InFlightRows += chunkRows;
+ outputChannel.PeerState.InFlightCount += 1;
}
bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) {
@@ -796,13 +801,7 @@ TDqComputeActorChannels::TInputChannelState& TDqComputeActorChannels::InCh(ui64
return *ch;
}
-const TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) const {
- auto ch = OutputChannelsMap.FindPtr(channelId);
- YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId);
- return *ch;
-}
-
-TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(const ui64 channelId) {
+TDqComputeActorChannels::TOutputChannelState& TDqComputeActorChannels::OutCh(ui64 channelId) {
auto ch = OutputChannelsMap.FindPtr(channelId);
YQL_ENSURE(ch, "task: " << TaskId << ", unknown output channelId: " << channelId);
return *ch;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
index c878c228d12..cade7d4fb70 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
@@ -7,59 +7,17 @@
#include <ydb/library/yql/dq/actors/dq.h>
#include <library/cpp/actors/core/interconnect.h>
-#include <library/cpp/actors/core/log.h>
-#include <util/generic/noncopyable.h>
namespace NYql::NDq {
class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels> {
public:
- struct TPeerState: NNonCopyable::TMoveOnly {
- private:
- static const ui32 InterconnectHeadersSize = 96;
- i64 InFlightBytes = 0;
- i64 InFlightRows = 0;
- i32 InFlightCount = 0;
+ struct TPeerState {
i64 PeerFreeSpace = 0;
- public:
- i64 GetFreeMemory() const {
- return PeerFreeSpace - InFlightBytes;
- }
-
- bool NeedAck() const {
- return (InFlightCount % 16 == 0) || !HasFreeMemory();
- }
-
- TString DebugString() const {
- return TStringBuilder() <<
- "freeSpace:" << PeerFreeSpace << ";" <<
- "inFlightBytes:" << InFlightBytes << ";" <<
- "inFlightCount:" << InFlightCount << ";"
- ;
- }
-
- bool HasFreeMemory() const {
- return PeerFreeSpace >= InFlightBytes;
- }
-
- void ActualizeFreeSpace(const i64 actual) {
- PeerFreeSpace = actual;
- }
-
- void AddInFlight(const ui64 bytes, const ui64 rows) {
- InFlightBytes += bytes + InterconnectHeadersSize;
- InFlightRows += rows;
- InFlightCount += 1;
- }
-
- void RemoveInFlight(const ui64 bytes, const ui64 rows) {
- InFlightBytes -= (bytes + InterconnectHeadersSize);
- Y_VERIFY(InFlightBytes >= 0);
- InFlightRows -= rows;
- Y_VERIFY(InFlightRows >= 0);
- InFlightCount -= 1;
- Y_VERIFY(InFlightCount >= 0);
- }
+ ui64 InFlightBytes = 0;
+ ui64 InFlightRows = 0;
+ ui32 InFlightCount = 0;
+ i64 PrevPeerFreeSpace = 0;
};
struct ICallbacks {
@@ -117,9 +75,8 @@ public:
void SetCheckpointsSupport(); // Finished channels will be polled for checkpoints.
void SetInputChannelPeer(ui64 channelId, const NActors::TActorId& peer);
void SetOutputChannelPeer(ui64 channelId, const NActors::TActorId& peer);
- bool CanSendChannelData(const ui64 channelId) const;
- bool HasFreeMemoryInChannel(const ui64 channelId) const;
- void SendChannelData(TChannelDataOOB&& channelData, const bool needAck);
+ bool CanSendChannelData(ui64 channelId);
+ void SendChannelData(TChannelDataOOB&& channelData);
void SendChannelDataAck(i64 channelId, i64 freeSpace);
bool PollChannel(ui64 channelId, i64 freeSpace);
bool CheckInFlight(const TString& prefix);
@@ -222,7 +179,6 @@ private:
ui32 CalcMessageFlags(const NActors::TActorId& peer);
TInputChannelState& InCh(ui64 channelId);
TOutputChannelState& OutCh(ui64 channelId);
- const TOutputChannelState& OutCh(const ui64 channelId) const;
private:
const NActors::TActorId Owner;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 274e2c03647..ef64de90857 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -350,7 +350,7 @@ protected:
// Send checkpoints to output channels.
ProcessOutputsImpl(ERunStatus::Finished);
- return true; // returns true, when channels were handled synchronously
+ return true; // returns true, when channels were handled syncronously
}
void ProcessOutputsImpl(ERunStatus status) {
@@ -379,7 +379,8 @@ protected:
if (!outputChannel.Finished || Checkpoints) {
if (Channels->CanSendChannelData(channelId)) {
- DrainOutputChannel(outputChannel);
+ auto peerState = Channels->GetOutputChannelInFlightState(channelId);
+ DrainOutputChannel(outputChannel, peerState);
} else {
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
}
@@ -1016,87 +1017,6 @@ protected:
bool Changed = false;
};
TMaybe<TAsyncData> AsyncData;
-
- class TDrainedChannelMessage {
- private:
- TDqSerializedBatch Data;
- NDqProto::TWatermark Watermark;
- NDqProto::TCheckpoint Checkpoint;
-
- ui32 DataSize = 0;
- ui32 WatermarkSize = 0;
- ui32 CheckpointSize = 0;
-
- bool HasData = false;
- bool HasWatermark = false;
- bool HasCheckpoint = false;
- bool Finished = false;
- public:
- const NDqProto::TWatermark* GetWatermarkOptional() const {
- return HasWatermark ? &Watermark : nullptr;
- }
- const NDqProto::TCheckpoint* GetCheckpointOptional() const {
- return HasCheckpoint ? &Checkpoint : nullptr;
- }
-
- TChannelDataOOB BuildChannelData(const ui64 channelId) {
- TChannelDataOOB channelData;
- channelData.Proto.SetChannelId(channelId);
- channelData.Proto.SetFinished(Finished);
- if (HasData) {
- channelData.Proto.MutableData()->Swap(&Data.Proto);
- channelData.Payload = std::move(Data.Payload);
- }
- if (HasWatermark) {
- channelData.Proto.MutableWatermark()->Swap(&Watermark);
- }
- if (HasCheckpoint) {
- channelData.Proto.MutableCheckpoint()->Swap(&Checkpoint);
- }
-
- Y_VERIFY(HasData || HasWatermark || HasCheckpoint || Finished);
- return channelData;
- }
-
- bool ReadData(const TOutputChannelInfo& outputChannel) {
- auto channel = outputChannel.Channel;
-
- HasData = channel->Pop(Data);
- HasWatermark = channel->Pop(Watermark);
- HasCheckpoint = channel->Pop(Checkpoint);
- Finished = !outputChannel.Finished && channel->IsFinished();
-
- if (!HasData && !HasWatermark && !HasCheckpoint && !Finished) {
- return false;
- }
-
- DataSize = Data.Size();
- WatermarkSize = Watermark.ByteSize();
- CheckpointSize = Checkpoint.ByteSize();
-
- return true;
- }
- };
-
- std::vector<TDrainedChannelMessage> DrainChannel(const ui32 countLimit) {
- std::vector<TDrainedChannelMessage> result;
- if (Finished) {
- Y_VERIFY(Channel->IsFinished());
- return result;
- }
- result.reserve(countLimit);
- for (ui32 i = 0; i < countLimit && !Finished; ++i) {
- TDrainedChannelMessage message;
- if (!message.ReadData(*this)) {
- break;
- }
- result.emplace_back(std::move(message));
- if (Channel->IsFinished()) {
- Finished = true;
- }
- }
- return result;
- }
};
struct TAsyncOutputInfoBase {
@@ -1345,9 +1265,9 @@ protected:
protected:
- void UpdateBlocked(TOutputChannelInfo& outputChannel, const bool blocked) {
+ void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) {
if (Y_UNLIKELY(outputChannel.Stats)) {
- if (blocked) {
+ if (toSend <= 0) {
outputChannel.Stats->BlockedByCapacity++;
if (!outputChannel.Stats->StartBlockedTime) {
outputChannel.Stats->StartBlockedTime = TInstant::Now();
@@ -1367,54 +1287,94 @@ private:
return MemoryQuota->GetProfileStats();
}
- virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel) {
+ virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
const bool wasFinished = outputChannel.Finished;
auto channelId = outputChannel.Channel->GetChannelId();
+ const ui32 allowedOvercommit = AllowedChannelsOvercommit();
+
+ const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes;
+
CA_LOG_T("About to drain channelId: " << channelId
<< ", hasPeer: " << outputChannel.HasPeer
+ << ", peerFreeSpace: " << peerState.PeerFreeSpace
+ << ", inFlightBytes: " << peerState.InFlightBytes
+ << ", inFlightRows: " << peerState.InFlightRows
+ << ", inFlightCount: " << peerState.InFlightCount
+ << ", allowedOvercommit: " << allowedOvercommit
+ << ", toSend: " << toSend
<< ", finished: " << outputChannel.Channel->IsFinished());
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
- UpdateBlocked(outputChannel, !Channels->HasFreeMemoryInChannel(channelId));
+ UpdateBlocked(outputChannel, toSend);
- ui32 sentChunks = 0;
- while ((!outputChannel.Finished || Checkpoints) &&
- Channels->HasFreeMemoryInChannel(outputChannel.ChannelId))
- {
- const static ui32 drainPackSize = 16;
- std::vector<typename TOutputChannelInfo::TDrainedChannelMessage> channelData = outputChannel.DrainChannel(drainPackSize);
- ui32 idx = 0;
- for (auto&& i : channelData) {
- if (auto* w = i.GetWatermarkOptional()) {
- CA_LOG_I("Resume inputs by watermark");
- // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
- // But, let it be here, it's better to have the same code as in checkpoints
- ResumeInputsByWatermark(TInstant::MicroSeconds(w->GetTimestampUs()));
- }
- if (i.GetCheckpointOptional()) {
- CA_LOG_I("Resume inputs by checkpoint");
- ResumeInputsByCheckpoint();
- }
-
- Channels->SendChannelData(i.BuildChannelData(outputChannel.ChannelId), ++idx == channelData.size());
- ++sentChunks;
- }
- if (drainPackSize != channelData.size()) {
- if (!outputChannel.Finished) {
- CA_LOG_T("output channelId: " << outputChannel.ChannelId << ", nothing to send and is not finished");
- }
+ i64 remains = toSend;
+ while (remains > 0 && (!outputChannel.Finished || Checkpoints)) {
+ ui32 sent = this->SendChannelDataChunk(outputChannel);
+ if (sent == 0) {
break;
}
+ remains -= sent;
}
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
- ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks;
+ ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend;
+ }
+
+ ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) {
+ auto channel = outputChannel.Channel;
+
+ TDqSerializedBatch data;
+ NDqProto::TWatermark watermark;
+ NDqProto::TCheckpoint checkpoint;
+
+ bool hasData = channel->Pop(data);
+ bool hasWatermark = channel->Pop(watermark);
+ bool hasCheckpoint = channel->Pop(checkpoint);
+ if (!hasData && !hasWatermark && !hasCheckpoint) {
+ if (!channel->IsFinished()) {
+ CA_LOG_T("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished");
+ return 0; // channel is empty and not finished yet
+ }
+ }
+ const bool wasFinished = outputChannel.Finished;
+ outputChannel.Finished = channel->IsFinished();
+ const bool becameFinished = !wasFinished && outputChannel.Finished;
+
+ ui32 dataSize = data.Size();
+ ui32 watermarkSize = watermark.ByteSize();
+ ui32 checkpointSize = checkpoint.ByteSize();
+
+ TChannelDataOOB channelData;
+ channelData.Proto.SetChannelId(channel->GetChannelId());
+ channelData.Proto.SetFinished(outputChannel.Finished);
+ if (hasData) {
+ channelData.Proto.MutableData()->Swap(&data.Proto);
+ channelData.Payload = std::move(data.Payload);
+ }
+ if (hasWatermark) {
+ channelData.Proto.MutableWatermark()->Swap(&watermark);
+ CA_LOG_I("Resume inputs by watermark");
+ // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
+ // But, let it be here, it's better to have the same code as in checkpoints
+ ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs()));
+ }
+ if (hasCheckpoint) {
+ channelData.Proto.MutableCheckpoint()->Swap(&checkpoint);
+ CA_LOG_I("Resume inputs by checkpoint");
+ ResumeInputsByCheckpoint();
+ }
+
+ if (hasData || hasWatermark || hasCheckpoint || becameFinished) {
+ Channels->SendChannelData(std::move(channelData));
+ return dataSize + watermarkSize + checkpointSize;
+ }
+ return 0;
}
virtual void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) {