diff options
-rw-r--r-- | ydb/library/yql/providers/dq/actors/full_result_writer.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/result_actor_base.h | 18 |
2 files changed, 17 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index ef8b3aaeba..1f282026fa 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -95,6 +95,10 @@ private: } record.Data = std::move(request); if (record.Data.GetFinish()) { + ui64 reqSize = record.Data.GetData().ByteSizeLong() + record.Payload.size(); + if (reqSize != 0) { + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue("Non empty final write " + std::to_string(record.Data.ByteSizeLong()) + " " + std::to_string(record.Payload.size())) .SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR))); + } Finish(); } else { Continue(std::move(record)); diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 493914bd2b..bdf0ca35dc 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -144,9 +144,8 @@ namespace NYql::NDqs::NExecutionHelpers { FinishCalled = true; if (FullResultWriterID) { - NDqProto::TFullResultWriterWriteRequest requestRecord; - requestRecord.SetFinish(true); - TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(requestRecord))); + WriteQueue.emplace(TQueueItem::Final()); + TryWriteToFullResultTable(); } else { DoFinish(); } @@ -343,6 +342,7 @@ namespace NYql::NDqs::NExecutionHelpers { if (!src.Data.Payload.IsEmpty()) { req->Record.MutableData()->SetPayloadId(req->AddPayload(std::move(src.Data.Payload))); } + req->Record.SetFinish(src.IsFinal); TBase::Send(FullResultWriterID, std::move(req)); } @@ -353,13 +353,21 @@ namespace NYql::NDqs::NExecutionHelpers { : Data(std::move(data)) , MessageId(messageId) , SentProcessedEvent(false) + , IsFinal(false) { } - //NDqProto::TFullResultWriterWriteRequest WriteRequest; + static TQueueItem Final() { + TQueueItem item({}, "FinalMessage"); + item.SentProcessedEvent = true; + item.IsFinal = true; + return item; + } + NDq::TDqSerializedBatch Data; const TString MessageId; - bool SentProcessedEvent; + bool SentProcessedEvent = false; + bool IsFinal = false; }; protected: |