diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-02-18 18:08:15 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-02-18 18:08:15 +0300 |
commit | 5ac55b01de51e962878b17d5f18ad48afc8c1659 (patch) | |
tree | 04db210fc36b51be42218511f30d365a38fdbb58 | |
parent | 47a67a837880704fa43a9f06152f6e0127cc285b (diff) | |
download | ydb-5ac55b01de51e962878b17d5f18ad48afc8c1659.tar.gz |
YQL-14355: ack message after it's written to yt
ref:2dd6f865d9fa5baec788499aaf0437dcbac0b902
-rw-r--r-- | ydb/library/yql/dq/common/dq_common.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/events.cpp | 15 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/events.h | 25 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/full_result_writer.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/result_actor_base.h | 262 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/result_aggregator.cpp | 59 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/result_receiver.cpp | 88 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/api/protos/dqs.proto | 12 |
8 files changed, 312 insertions, 196 deletions
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index 71cc7be6cb..c60a5b1f4e 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -74,6 +74,9 @@ struct TBaseDqDataEvents { ES_FULL_RESULT_WRITER_STATUS_REQUEST, ES_FULL_RESULT_WRITER_STATUS_RESPONSE, + ES_FULL_RESULT_WRITER_WRITE_REQUEST, + ES_FULL_RESULT_WRITER_ACK, + ES_MESSAGE_PROCESSED, }; }; diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 5ebe75db67..7095124442 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -18,6 +18,10 @@ namespace NYql::NDqs { { } + TEvDqFailure::TEvDqFailure(const TString& error, TIssueCode issueCode, bool retriable, bool needFallback) + : TEvDqFailure(TIssue(error).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback) { + } + TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { Record = std::move(queryResult); } @@ -56,4 +60,15 @@ namespace NYql::NDqs { TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) { Record.CopyFrom(data); } + + TEvFullResultWriterWriteRequest::TEvFullResultWriterWriteRequest(NDqProto::TFullResultWriterWriteRequest&& data) { + Record.Swap(&data); + } + + TEvFullResultWriterAck::TEvFullResultWriterAck(NDqProto::TFullResultWriterAck& data) { + Record.CopyFrom(data); + } + + TEvMessageProcessed::TEvMessageProcessed(const TString& messageId) : MessageId(messageId) { + } } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 7fb39fd1f5..1004fd44d7 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -23,6 +23,7 @@ namespace NYql::NDqs { TEvDqFailure() = default; explicit TEvDqFailure(const TIssues& issues, bool retriable = false, bool needFallback = false); explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false); + TEvDqFailure(const TString& error, TIssueCode, bool retriable = false, bool needFallback = false); }; struct TEvQueryResponse @@ -76,12 +77,14 @@ namespace NYql::NDqs { TEvPingResponse() = default; }; + // legacy struct TEvFullResultWriterStatusRequest : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest, TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> { TEvFullResultWriterStatusRequest() = default; }; + // legacy struct TEvFullResultWriterStatusResponse : NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse, TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> { @@ -93,5 +96,25 @@ namespace NYql::NDqs { DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); }; - using TEvResultReceiverFinish = TEvGraphFinished; + struct TEvFullResultWriterWriteRequest + : NActors::TEventPB<TEvFullResultWriterWriteRequest, NYql::NDqProto::TFullResultWriterWriteRequest, + TDqDataEvents::ES_FULL_RESULT_WRITER_WRITE_REQUEST> { + TEvFullResultWriterWriteRequest() = default; + explicit TEvFullResultWriterWriteRequest(NDqProto::TFullResultWriterWriteRequest&& data); + }; + + struct TEvFullResultWriterAck + : NActors::TEventPB<TEvFullResultWriterAck, NYql::NDqProto::TFullResultWriterAck, + TDqDataEvents::ES_FULL_RESULT_WRITER_ACK> { + TEvFullResultWriterAck() = default; + explicit TEvFullResultWriterAck(NDqProto::TFullResultWriterAck& data); + }; + + struct TEvMessageProcessed : NActors::TEventBase<TEvMessageProcessed, TDqDataEvents::ES_MESSAGE_PROCESSED> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvMessageProcessed, ""); + + explicit TEvMessageProcessed(const TString& messageId); + + const TString MessageId; + }; } diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index 927ac06043..feb3669ab0 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -41,8 +41,8 @@ public: private: STRICT_STFUNC(Handler, { cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) - HFunc(TEvPullDataResponse, OnPullResponse) - HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest) + HFunc(TEvFullResultWriterWriteRequest, OnWriteRequest) + HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest) // legacy }) void PassAway() override { @@ -71,20 +71,13 @@ private: Send(AggregatorID, new TEvFullResultWriterStatusResponse(response)); } - void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) { + void OnWriteRequest(TEvFullResultWriterWriteRequest::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceID); - auto& response = ev->Get()->Record; - - switch (response.GetResponseType()) { - case NDqProto::FINISH: - Finish(); - break; - case NDqProto::CONTINUE: - Continue(response); - break; - default: - YQL_ENSURE(false, "Unsupported pull request"); - break; + auto& request = ev->Get()->Record; + if (request.GetFinish()) { + Finish(); + } else { + Continue(request); } } @@ -94,9 +87,7 @@ private: TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; }); FullResultWriter->Finish(); if (ErrorMessage) { - TIssue issue(*ErrorMessage); - issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); - Send(AggregatorID, MakeHolder<TEvDqFailure>(issue, false, false).Release()); + Send(AggregatorID, MakeHolder<TEvDqFailure>(*ErrorMessage, TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, false, false)); } else { Send(AggregatorID, MakeHolder<TEvDqFailure>().Release()); } @@ -111,14 +102,14 @@ private: Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>()); } - void Continue(NDqProto::TPullResponse& response) { + void Continue(NDqProto::TFullResultWriterWriteRequest& request) { YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount(); - ui64 respSize = response.ByteSizeLong(); - WriteToFullResultTable(response.GetData()); - BytesReceived += respSize; + ui64 reqSize = request.ByteSizeLong(); + WriteToFullResultTable(request); + BytesReceived += reqSize; } - void WriteToFullResultTable(const NDqProto::TData& data) { + void WriteToFullResultTable(NDqProto::TFullResultWriterWriteRequest& request) { if (ErrorMessage) { YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting"; return; @@ -126,18 +117,23 @@ private: try { TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; }); - ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) { + ResultBuilder->WriteData(request.GetData(), [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) { writer->AddRow(value); return true; }); + NDqProto::TFullResultWriterAck ackRecord; + ackRecord.SetMessageId(request.GetMessageId()); + Send(AggregatorID, MakeHolder<TEvFullResultWriterAck>(ackRecord)); } catch (...) { ErrorMessage = CurrentExceptionMessage(); + Send(AggregatorID, MakeHolder<TEvDqFailure>(*ErrorMessage, TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, false, false)); } if (ErrorMessage) { YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage; } } + private: const TString TraceID; THolder<TProtoBuilder> ResultBuilder; diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 8db8566e4f..4269c280d7 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -11,6 +11,7 @@ #include <util/stream/holder.h> #include <util/stream/length.h> +#include <util/generic/queue.h> namespace NYql::NDqs::NExecutionHelpers { @@ -28,13 +29,15 @@ namespace NYql::NDqs::NExecutionHelpers { NActors::TActorId graphExecutionEventsId, bool discard) : TBase(&TDerived::Handler) - , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false)) , ExecuterID(executerId) - , GraphExecutionEventsId(graphExecutionEventsId) - , Discard(discard) - , DataParts() , TraceId(traceId) , Settings(settings) + , FinishCalled(false) + , EarlyFinish(false) + , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false)) + , GraphExecutionEventsId(graphExecutionEventsId) + , Discard(discard) + , WriteQueue() , SizeLimit( (Settings && Settings->_AllResultsBytesLimit.Get().Defined()) ? Settings->_AllResultsBytesLimit.Get().GetRef() @@ -43,57 +46,43 @@ namespace NYql::NDqs::NExecutionHelpers { , Rows(0) , Truncated(false) , FullResultWriterID() - , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns)) + , ResultBuilder(resultType ? MakeHolder<TProtoBuilder>(resultType, columns) : nullptr) , ResultYson() , ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson))) , ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true)) - , FullResultSentBytes(0) - , FullResultReceivedBytes(0) - , FullResultSentDataParts(0) , Issues() - , FinishCalled(false) , BlockingActors() - , QueryResponse() { + , QueryResponse() + , WaitingAckFromFRW(false) { ResultYsonWriter->OnBeginList(); YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit; YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing"); } - void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); - YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; - - BlockingActors.clear(); - if (FullResultWriterID) { - BlockingActors.insert(FullResultWriterID); - TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>()); - } - - YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; - - QueryResponse.Reset(ev->Release().Release()); - TBase::Become(&TResultActorBase::ShutdownHandler); - TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>()); + virtual void FinishFullResultWriter() { + TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>()); } - void OnReceiveData(NYql::NDqProto::TData&& data) { + void OnReceiveData(NYql::NDqProto::TData&& data, const TString& messageId = "", bool autoAck = false) { YQL_LOG_CTX_SCOPE(TraceId); - if (Discard) { + if (data.GetRows() > 0 && !ResultBuilder) { + Issues.AddIssue(TIssue("Non empty rows: >=" + ToString(data.GetRows())).SetCode(0, TSeverityIds::S_WARNING)); + } + if (Discard || !ResultBuilder || autoAck) { + TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(messageId)); return; } + WriteQueue.emplace(std::move(data), messageId); if (FullResultTableEnabled && FullResultWriterID) { - WriteToFullResultTable(MakeHolder<NDqProto::TData>(data)); + TryWriteToFullResultTable(); } else { - DataParts.emplace_back(data); // todo: seems like DataParts stores at most 1 element (replace with holder) - bool full = true; bool exceedRows = false; try { TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; }); - full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) { + full = ResultBuilder->WriteYsonData(WriteQueue.back().WriteRequest.GetData(), [this, &exceedRows](const TString& rawYson) { if (RowsLimit && Rows + 1 > *RowsLimit) { exceedRows = true; return false; @@ -111,6 +100,8 @@ namespace NYql::NDqs::NExecutionHelpers { } if (full) { + WriteQueue.back().SentProcessedEvent = true; + TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(messageId)); return; } @@ -127,28 +118,12 @@ namespace NYql::NDqs::NExecutionHelpers { TIssue issue(issueMsg); issue.Severity = TSeverityIds::S_WARNING; Issues.AddIssues({issue}); + EarlyFinish = true; Finish(); } } } - void OnFullResultWriterShutdown() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Got TEvGone"; - - FullResultWriterID = {}; - } - - void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; - if (ev->Get()->Record.IssuesSize() == 0) { - DoFinish(); - } else { - TBase::Send(ExecuterID, ev->Release().Release()); - } - } - void OnError(const TString& message, bool retriable, bool needFallback) { YQL_LOG(ERROR) << "OnError " << message; auto issueCode = needFallback @@ -167,29 +142,38 @@ namespace NYql::NDqs::NExecutionHelpers { FinishCalled = true; if (FullResultWriterID) { - NDqProto::TPullResponse response; - response.SetResponseType(NDqProto::EPullResponseType::FINISH); - TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); + NDqProto::TFullResultWriterWriteRequest requestRecord; + requestRecord.SetFinish(true); + TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(requestRecord))); } else { DoFinish(); } } - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId()) - + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) - + "." + ToString(ev->Get()->SourceType & 0xFFFF); - OnError(message, true, true); + protected: + STFUNC(HandlerBase) { + Y_UNUSED(ctx); + switch (const ui32 etype = ev->GetTypeRewrite()) { + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered); + HFunc(TEvQueryResponse, OnQueryResult); + HFunc(TEvFullResultWriterAck, OnFullResultWriterAck); + HFunc(TEvDqFailure, OnFullResultWriterResponse); + cFunc(NActors::TEvents::TEvGone::EventType, OnFullResultWriterShutdown); + cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway) + default: + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << "Unexpected event " << etype; + break; + } } - private: - STFUNC(ShutdownHandler) { + STFUNC(ShutdownHandlerBase) { Y_UNUSED(ctx); switch (const ui32 etype = ev->GetTypeRewrite()) { HFunc(NActors::TEvents::TEvGone, OnShutdownQueryResult); cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway); HFunc(TEvDqFailure, OnErrorInShutdownState); + HFunc(TEvFullResultWriterAck, OnFullResultWriterAck); default: YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "Unexpected event " << etype; @@ -198,6 +182,66 @@ namespace NYql::NDqs::NExecutionHelpers { } private: + void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; + + BlockingActors.clear(); + if (FullResultWriterID) { + BlockingActors.insert(FullResultWriterID); + FinishFullResultWriter(); + } + + YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; + + QueryResponse.Reset(ev->Release().Release()); + TBase::Become(&TDerived::ShutdownHandler); + TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>()); + } + + void OnFullResultWriterShutdown() { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << "Got TEvGone"; + + FullResultWriterID = {}; + } + + void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + if (ev->Get()->Record.IssuesSize() == 0) { + DoFinish(); + } else { + TBase::Send(ExecuterID, ev->Release().Release()); + } + } + + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId()) + + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) + + "." + ToString(ev->Get()->SourceType & 0xFFFF); + OnError(message, true, true); + } + + void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + Y_VERIFY(ev->Get()->Record.GetMessageId() == WriteQueue.front().MessageId); + if (!WriteQueue.front().SentProcessedEvent) { // messages, received before limits exceeded, are already been reported + TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(WriteQueue.front().MessageId)); + } + WriteQueue.pop(); + + if (WriteQueue.empty()) { + WaitingAckFromFRW = false; + return; + } + + UnsafeWriteToFullResultTable(); + } + void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse TIssues issues; @@ -244,7 +288,7 @@ namespace NYql::NDqs::NExecutionHelpers { NActorsProto::TActorId fullResultWriterProto; ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto); FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto); - WriteAllDataPartsToFullResultTable(); + TryWriteToFullResultTable(); } }); } @@ -277,75 +321,47 @@ namespace NYql::NDqs::NExecutionHelpers { YQL_LOG(DEBUG) << __FUNCTION__; } - bool CanSendToFullResultWriter() { - // TODO Customize - return FullResultSentBytes - FullResultReceivedBytes <= 32_MB; - } - - template <class TCallback> - void UpdateEventQueueStatus(TCallback callback) { - YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB " - << " received " << (FullResultReceivedBytes / 1024.0) << " kB " - << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; - TBase::Send(FullResultWriterID, new TEvFullResultWriterStatusRequest()); - TBase::template Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived(); - YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB " - << " received " << (FullResultReceivedBytes / 1024.0) << " kB " - << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; - if (ev->Get()->Record.HasErrorMessage()) { - YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage(); - OnError(ev->Get()->Record.GetErrorMessage(), false, false); - return; - } - callback(); - }); - } - - void WriteAllDataPartsToFullResultTable() { - while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) { - UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]); - DataParts[FullResultSentDataParts].Clear(); - ++FullResultSentDataParts; - } - if (FullResultSentDataParts == DataParts.size()) { + void TryWriteToFullResultTable() { + if (WaitingAckFromFRW) { return; } - // here we cannot continue since the event queue is overloaded - // kind of tail recursion (but without recursion) - UpdateEventQueueStatus([this]() { - WriteAllDataPartsToFullResultTable(); - }); + WaitingAckFromFRW = true; + UnsafeWriteToFullResultTable(); } - void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) { - if (CanSendToFullResultWriter()) { - UnsafeWriteToFullResultTable(*data); - return; - } - UpdateEventQueueStatus([this, data]() { - WriteToFullResultTable(data); - }); + void UnsafeWriteToFullResultTable() { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(WriteQueue.front().WriteRequest))); } - void UnsafeWriteToFullResultTable(const NDqProto::TData& data) { - NDqProto::TPullResponse response; - response.SetResponseType(NDqProto::EPullResponseType::CONTINUE); - response.MutableData()->CopyFrom(data); - ui64 respSize = response.ByteSizeLong(); - TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); - FullResultSentBytes += respSize; - } + private: + struct TQueueItem { + TQueueItem(NDqProto::TData&& data, const TString& messageId) + : WriteRequest() + , MessageId(messageId) + , SentProcessedEvent(false) { + *WriteRequest.MutableData() = std::move(data); + WriteRequest.SetMessageId(messageId); + } + + NDqProto::TFullResultWriterWriteRequest WriteRequest; + const TString MessageId; + bool SentProcessedEvent; + }; protected: - const bool FullResultTableEnabled; const NActors::TActorId ExecuterID; - const NActors::TActorId GraphExecutionEventsId; - const bool Discard; - TVector<NDqProto::TData> DataParts; const TString TraceId; TDqConfiguration::TPtr Settings; + bool FinishCalled; + bool EarlyFinish; + + private: + const bool FullResultTableEnabled; + const NActors::TActorId GraphExecutionEventsId; + const bool Discard; + TQueue<TQueueItem> WriteQueue; ui64 SizeLimit; TMaybe<ui64> RowsLimit; ui64 Rows; @@ -355,15 +371,9 @@ namespace NYql::NDqs::NExecutionHelpers { TString ResultYson; THolder<TCountingOutput> ResultYsonOut; THolder<NYson::TYsonWriter> ResultYsonWriter; - - ui64 FullResultSentBytes; - ui64 FullResultReceivedBytes; - ui64 FullResultSentDataParts; - TIssues Issues; - bool FinishCalled; - THashSet<NActors::TActorId> BlockingActors; THolder<TEvQueryResponse> QueryResponse; + bool WaitingAckFromFRW; }; } // namespace NYql::NDqs::NExecutionHelpers diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index 44ab1451d9..538f0c9b65 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -27,6 +27,7 @@ #include <library/cpp/yson/writer.h> #include <util/generic/ptr.h> +#include <util/generic/guid.h> #include <util/string/split.h> #include <util/system/types.h> #include <util/stream/str.h> @@ -50,6 +51,7 @@ using namespace NActors; namespace { class TResultAggregator: public TResultActorBase<TResultAggregator> { + using TBase = TResultActorBase<TResultAggregator>; static constexpr ui32 MAX_RESULT_BATCH = 2048; public: @@ -57,7 +59,8 @@ public: explicit TResultAggregator(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const TString& resultType, NActors::TActorId graphExecutionEventsId, bool discard) - : TResultActorBase<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) { + : TBase(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) + , Continue(false) { if (Settings) { PullRequestTimeout = TDuration::MilliSeconds(settings->PullRequestTimeoutMs.Get().GetOrElse(0)); PingTimeout = TDuration::MilliSeconds(settings->PingTimeoutMs.Get().GetOrElse(0)); @@ -66,26 +69,37 @@ public: } public: -#define HANDLER_STUB(TEvType) \ - cFunc(TEvType::EventType, [this]() { \ - YQL_LOG_CTX_SCOPE(TraceId); \ - YQL_LOG(DEBUG) << "Unexpected event " << ( #TEvType ); \ - }) - - STRICT_STFUNC(Handler, { - HFunc(TEvPullResult, OnPullResult); - HFunc(TEvReadyState, OnReadyState); - HFunc(TEvPullDataResponse, OnPullResponse); - HFunc(TEvPingResponse, OnPingResponse); - HFunc(TEvQueryResponse, OnQueryResult); - HFunc(TEvDqFailure, OnFullResultWriterResponse); - cFunc(TEvents::TEvPoison::EventType, PassAway) - hFunc(TEvents::TEvUndelivered, OnUndelivered) - cFunc(TEvents::TEvWakeup::EventType, OnWakeup) - cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown) - }) + STFUNC(Handler) { + switch (const ui32 etype = ev->GetTypeRewrite()) { + HFunc(TEvPullDataResponse, OnPullResponse); + cFunc(TEvents::TEvWakeup::EventType, OnWakeup) + sFunc(TEvMessageProcessed, OnMessageProcessed) + HFunc(TEvPullResult, OnPullResult); + HFunc(TEvReadyState, OnReadyState); + HFunc(TEvPingResponse, OnPingResponse); + default: + TBase::HandlerBase(ev, ctx); + } + } + + STFUNC(ShutdownHandler) { + switch (const ui32 etype = ev->GetTypeRewrite()) { + sFunc(TEvMessageProcessed, OnMessageProcessed); + default: + TBase::ShutdownHandlerBase(ev, ctx); + } + } private: + void OnMessageProcessed() { + if (!Continue) { + return; + } + + Continue = false; + Send(SelfId(), MakeHolder<TEvPullResult>()); + } + void OnWakeup() { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; @@ -148,7 +162,7 @@ private: switch (response.GetResponseType()) { case NYql::NDqProto::CONTINUE: { - Send(SelfId(), MakeHolder<TEvPullResult>()); + Continue = true; } break; case NYql::NDqProto::FINISH: Finish(); @@ -169,9 +183,11 @@ private: break; } - OnReceiveData(std::move(*response.MutableData())); + // guid here is redundant and serves only for logic validation + OnReceiveData(std::move(*response.MutableData()), TGUID::Create().AsGuidString()); } +private: NActors::TActorId SourceID; TDuration PullRequestTimeout; TDuration PingTimeout; @@ -180,6 +196,7 @@ private: TInstant PullRequestStartTime; bool PingRequested = false; NActors::TSchedulerCookieHolder TimerCookieHolder; + bool Continue; }; class TResultPrinter: public TActor<TResultPrinter> { diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp index 2e0a7bfbdd..7c743e7f60 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp +++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp @@ -34,51 +34,91 @@ namespace { class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TResultReceiver> { public: + using TBase = TResultActorBase<TResultReceiver>; + static constexpr char ActorName[] = "YQL_DQ_RESULT_RECEIVER"; explicit TResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const THashMap<TString, TString>& /*secureParams*/, const TString& resultType, const NActors::TActorId& graphExecutionEventsId, bool discard) - : TResultActorBase<TResultReceiver>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) { + : TBase(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) + , PendingMessages() { } public: - STRICT_STFUNC(Handler, { - HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData) - HFunc(TEvReadyState, OnReadyState); - HFunc(TEvQueryResponse, OnQueryResult); - cFunc(TEvents::TEvPoison::EventType, PassAway); - HFunc(TEvDqFailure, OnFullResultWriterResponse); - hFunc(TEvents::TEvUndelivered, OnUndelivered); - cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown); - sFunc(TEvResultReceiverFinish, Finish); - }) + STFUNC(Handler) { + switch (const ui32 etype = ev->GetTypeRewrite()) { + HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData); + HFunc(TEvReadyState, OnReadyState); + hFunc(TEvMessageProcessed, OnMessageProcessed); + default: + TBase::HandlerBase(ev, ctx); + } + } + + STFUNC(ShutdownHandler) { + switch (const ui32 etype = ev->GetTypeRewrite()) { + hFunc(TEvMessageProcessed, OnMessageProcessed); + default: + TBase::ShutdownHandlerBase(ev, ctx); + } + } private: void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; + bool finishRequested = ev->Get()->Record.GetChannelData().GetFinished(); if (!FinishCalled) { - OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData())); + const auto messageId = GetMessageId(ev); + const auto hasData = ev->Get()->Record.GetChannelData().HasData(); + OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData()), messageId, !hasData); + const auto [it, inserted] = PendingMessages.insert({messageId, std::move(ev)}); + Y_ENSURE(inserted); } - if (!FinishCalled && ev->Get()->Record.GetChannelData().GetFinished()) { - Send(SelfId(), MakeHolder<TEvResultReceiverFinish>()); // postpone finish until TFullResultWriterActor is instaniated - } - - // todo: ack after data is stored to yt? - auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); - res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId()); - res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - res->Record.SetFreeSpace(256_MB); - res->Record.SetFinish(FinishCalled); // set if premature finish started (when response limit reached and FullResultTable not enabled) - Send(ev->Sender, res.Release()); - YQL_LOG(DEBUG) << "Finished: " << ev->Get()->Record.GetChannelData().GetFinished(); + YQL_LOG(DEBUG) << "Finished: " << finishRequested; } void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { // do nothing } + + void OnMessageProcessed(TEvMessageProcessed::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + SendAck(ev->Get()->MessageId); + } + + void SendAck(const TString& messageId) { + const auto messageIt = PendingMessages.find(messageId); + Y_VERIFY(messageIt != PendingMessages.end()); + const auto& message = messageIt->second; + + auto req = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); + req->Record.SetChannelId(message->Get()->Record.GetChannelData().GetChannelId()); + req->Record.SetSeqNo(message->Get()->Record.GetSeqNo()); + req->Record.SetFreeSpace(256_MB); + req->Record.SetFinish(EarlyFinish); // set if premature finish started (when response limit reached and FullResultTable not enabled) + + Send(message->Sender, req.Release()); + PendingMessages.erase(messageIt); + } + + TString GetMessageId(const NDq::TEvDqCompute::TEvChannelData::TPtr& message) const { + TStringBuilder res; + res << message->Get()->Record.GetChannelData().GetChannelId() + << " " << message->Get()->Record.GetSeqNo() + << " " << ToString(message->Sender); + return res; + } + + void FinishFullResultWriter() override { + Finish(); + } + +private: + THashMap<TString, NDq::TEvDqCompute::TEvChannelData::TPtr> PendingMessages; // N.B. TEvChannelData is partially moved }; } /* namespace */ diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 09aecfcf92..309f1ab73d 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -232,14 +232,26 @@ message TGraphExecutionEvent { } } +// legacy message TFullResultWriterStatusRequest { } +// legacy message TFullResultWriterStatusResponse { uint64 BytesReceived = 1; optional string ErrorMessage = 2; } +message TFullResultWriterWriteRequest { + TData Data = 1; + bool Finish = 2; + bytes MessageId = 3; +} + +message TFullResultWriterAck { + bytes MessageId = 1; +} + message TDqTaskPrepareResult { bool Result = 1; repeated TMetric Metric = 2; |