aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h18
2 files changed, 17 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index ef8b3aaeba..1f282026fa 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -95,6 +95,10 @@ private:
}
record.Data = std::move(request);
if (record.Data.GetFinish()) {
+ ui64 reqSize = record.Data.GetData().ByteSizeLong() + record.Payload.size();
+ if (reqSize != 0) {
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue("Non empty final write " + std::to_string(record.Data.ByteSizeLong()) + " " + std::to_string(record.Payload.size())) .SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)));
+ }
Finish();
} else {
Continue(std::move(record));
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index 493914bd2b..bdf0ca35dc 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -144,9 +144,8 @@ namespace NYql::NDqs::NExecutionHelpers {
FinishCalled = true;
if (FullResultWriterID) {
- NDqProto::TFullResultWriterWriteRequest requestRecord;
- requestRecord.SetFinish(true);
- TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(requestRecord)));
+ WriteQueue.emplace(TQueueItem::Final());
+ TryWriteToFullResultTable();
} else {
DoFinish();
}
@@ -343,6 +342,7 @@ namespace NYql::NDqs::NExecutionHelpers {
if (!src.Data.Payload.IsEmpty()) {
req->Record.MutableData()->SetPayloadId(req->AddPayload(std::move(src.Data.Payload)));
}
+ req->Record.SetFinish(src.IsFinal);
TBase::Send(FullResultWriterID, std::move(req));
}
@@ -353,13 +353,21 @@ namespace NYql::NDqs::NExecutionHelpers {
: Data(std::move(data))
, MessageId(messageId)
, SentProcessedEvent(false)
+ , IsFinal(false)
{
}
- //NDqProto::TFullResultWriterWriteRequest WriteRequest;
+ static TQueueItem Final() {
+ TQueueItem item({}, "FinalMessage");
+ item.SentProcessedEvent = true;
+ item.IsFinal = true;
+ return item;
+ }
+
NDq::TDqSerializedBatch Data;
const TString MessageId;
- bool SentProcessedEvent;
+ bool SentProcessedEvent = false;
+ bool IsFinal = false;
};
protected: