diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-02-14 18:43:37 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-02-14 18:43:37 +0300 |
commit | c05a3101b2209fff1587bf4d0df33f28d94d76bd (patch) | |
tree | e350f3f38505774f4782fb1cb5089d375975dcc4 | |
parent | 059c787fd720eb6552d2f2584848b33dd42f1ba8 (diff) | |
download | ydb-c05a3101b2209fff1587bf4d0df33f28d94d76bd.tar.gz |
YQL-14355: add fullResultWrite to result_receiver
ref:80445b98140ef840628ec678b30b9fb41932a718
5 files changed, 412 insertions, 429 deletions
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index bd03b264833..7fb39fd1f54 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -92,4 +92,6 @@ namespace NYql::NDqs { struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> { DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); }; + + using TEvResultReceiverFinish = TEvGraphFinished; } diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h new file mode 100644 index 00000000000..8db8566e4f7 --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -0,0 +1,369 @@ +#pragma once + +#include <ydb/library/yql/providers/dq/actors/actor_helpers.h> +#include <ydb/library/yql/providers/dq/actors/events.h> +#include <ydb/library/yql/providers/dq/actors/proto_builder.h> +#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> +#include <ydb/library/yql/providers/dq/counters/counters.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/library/yql/utils/failure_injector/failure_injector.h> + +#include <util/stream/holder.h> +#include <util/stream/length.h> + +namespace NYql::NDqs::NExecutionHelpers { + + template <class TDerived> + class TResultActorBase : public NYql::TSynchronizableRichActor<TDerived>, public NYql::TCounters { + protected: + using TBase = NYql::TSynchronizableRichActor<TDerived>; + + TResultActorBase( + const TVector<TString>& columns, + const NActors::TActorId& executerId, + const TString& traceId, + const TDqConfiguration::TPtr& settings, + const TString& resultType, + NActors::TActorId graphExecutionEventsId, + bool discard) + : TBase(&TDerived::Handler) + , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false)) + , ExecuterID(executerId) + , GraphExecutionEventsId(graphExecutionEventsId) + , Discard(discard) + , DataParts() + , TraceId(traceId) + , Settings(settings) + , SizeLimit( + (Settings && Settings->_AllResultsBytesLimit.Get().Defined()) + ? Settings->_AllResultsBytesLimit.Get().GetRef() + : 64000000) // GRPC limit + , RowsLimit(settings ? Settings->_RowsLimitPerWrite.Get() : Nothing()) + , Rows(0) + , Truncated(false) + , FullResultWriterID() + , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns)) + , ResultYson() + , ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson))) + , ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true)) + , FullResultSentBytes(0) + , FullResultReceivedBytes(0) + , FullResultSentDataParts(0) + , Issues() + , FinishCalled(false) + , BlockingActors() + , QueryResponse() { + ResultYsonWriter->OnBeginList(); + YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit; + YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing"); + } + + void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; + + BlockingActors.clear(); + if (FullResultWriterID) { + BlockingActors.insert(FullResultWriterID); + TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>()); + } + + YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; + + QueryResponse.Reset(ev->Release().Release()); + TBase::Become(&TResultActorBase::ShutdownHandler); + TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>()); + } + + void OnReceiveData(NYql::NDqProto::TData&& data) { + YQL_LOG_CTX_SCOPE(TraceId); + + if (Discard) { + return; + } + + if (FullResultTableEnabled && FullResultWriterID) { + WriteToFullResultTable(MakeHolder<NDqProto::TData>(data)); + } else { + DataParts.emplace_back(data); // todo: seems like DataParts stores at most 1 element (replace with holder) + + bool full = true; + bool exceedRows = false; + try { + TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; }); + full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) { + if (RowsLimit && Rows + 1 > *RowsLimit) { + exceedRows = true; + return false; + } else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) { + return false; + } + ResultYsonWriter->OnListItem(); + ResultYsonWriter->OnRaw(rawYson); + ++Rows; + return true; + }); + } catch (...) { + OnError(CurrentExceptionMessage(), false, true); + return; + } + + if (full) { + return; + } + + Truncated = true; + if (FullResultTableEnabled) { + FlushCurrent(); + } else { + TString issueMsg; + if (exceedRows) { + issueMsg = TStringBuilder() << "Rows limit reached: " << *RowsLimit; + } else { + issueMsg = TStringBuilder() << "Size limit reached: " << SizeLimit; + } + TIssue issue(issueMsg); + issue.Severity = TSeverityIds::S_WARNING; + Issues.AddIssues({issue}); + Finish(); + } + } + } + + void OnFullResultWriterShutdown() { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << "Got TEvGone"; + + FullResultWriterID = {}; + } + + void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + if (ev->Get()->Record.IssuesSize() == 0) { + DoFinish(); + } else { + TBase::Send(ExecuterID, ev->Release().Release()); + } + } + + void OnError(const TString& message, bool retriable, bool needFallback) { + YQL_LOG(ERROR) << "OnError " << message; + auto issueCode = needFallback + ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR + : TIssuesIds::DQ_GATEWAY_ERROR; + const auto issue = TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR); + Issues.AddIssues({issue}); // remember issue to pass it with TEvQueryResponse, cause executor_actor ignores TEvDqFailure after finish + auto req = MakeHolder<TEvDqFailure>(issue, retriable, needFallback); + FlushCounters(req->Record); + TBase::Send(ExecuterID, req.Release()); + } + + void Finish() { + YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << Truncated; + YQL_ENSURE(!FinishCalled); + FinishCalled = true; + + if (FullResultWriterID) { + NDqProto::TPullResponse response; + response.SetResponseType(NDqProto::EPullResponseType::FINISH); + TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); + } else { + DoFinish(); + } + } + + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId()) + + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) + + "." + ToString(ev->Get()->SourceType & 0xFFFF); + OnError(message, true, true); + } + + private: + STFUNC(ShutdownHandler) { + Y_UNUSED(ctx); + switch (const ui32 etype = ev->GetTypeRewrite()) { + HFunc(NActors::TEvents::TEvGone, OnShutdownQueryResult); + cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway); + HFunc(TEvDqFailure, OnErrorInShutdownState); + default: + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << "Unexpected event " << etype; + break; + } + } + + private: + void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { + // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse + TIssues issues; + IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); + Issues.AddIssues(issues); + } + + void OnShutdownQueryResult(NActors::TEvents::TEvGone::TPtr& ev, const NActors::TActorContext&) { + YQL_LOG_CTX_SCOPE(TraceId); + auto iter = BlockingActors.find(ev->Sender); + if (iter != BlockingActors.end()) { + BlockingActors.erase(iter); + } + + YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left"; + + if (BlockingActors.empty()) { + EndOnQueryResult(); + } + } + + void DoFinish() { + TBase::Send(ExecuterID, new TEvGraphFinished()); + } + + void FlushCurrent() { + YQL_LOG(DEBUG) << __FUNCTION__; + YQL_ENSURE(!FullResultWriterID); + YQL_ENSURE(FullResultTableEnabled); + + NDqProto::TGraphExecutionEvent record; + record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT); + NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload; + payload.SetResultType(ResultBuilder->GetSerializedType()); + record.MutableMessage()->PackFrom(payload); + TBase::Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record)); + TBase::template Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) { + Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); + YQL_LOG_CTX_SCOPE(TraceId); + + if (auto msg = ev->Get()->Record.GetErrorMessage()) { + OnError(msg, false, true); + } else { + NActorsProto::TActorId fullResultWriterProto; + ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto); + FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto); + WriteAllDataPartsToFullResultTable(); + } + }); + } + + void EndOnQueryResult() { + YQL_LOG(DEBUG) << __FUNCTION__; + NDqProto::TQueryResponse result = QueryResponse->Record; + + YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); + FlushCounters(result); + + if (ResultYsonWriter) { + ResultYsonWriter->OnEndList(); + ResultYsonWriter.Destroy(); + } + ResultYsonOut.Destroy(); + + *result.MutableYson() = ResultYson; + + if (!Issues.Empty()) { + NYql::IssuesToMessage(Issues, result.MutableIssues()); + } + result.SetTruncated(Truncated); + + TBase::Send(ExecuterID, new TEvQueryResponse(std::move(result))); + } + + void DoPassAway() override { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; + } + + bool CanSendToFullResultWriter() { + // TODO Customize + return FullResultSentBytes - FullResultReceivedBytes <= 32_MB; + } + + template <class TCallback> + void UpdateEventQueueStatus(TCallback callback) { + YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB " + << " received " << (FullResultReceivedBytes / 1024.0) << " kB " + << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; + TBase::Send(FullResultWriterID, new TEvFullResultWriterStatusRequest()); + TBase::template Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived(); + YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB " + << " received " << (FullResultReceivedBytes / 1024.0) << " kB " + << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; + if (ev->Get()->Record.HasErrorMessage()) { + YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage(); + OnError(ev->Get()->Record.GetErrorMessage(), false, false); + return; + } + callback(); + }); + } + + void WriteAllDataPartsToFullResultTable() { + while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) { + UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]); + DataParts[FullResultSentDataParts].Clear(); + ++FullResultSentDataParts; + } + if (FullResultSentDataParts == DataParts.size()) { + return; + } + // here we cannot continue since the event queue is overloaded + // kind of tail recursion (but without recursion) + UpdateEventQueueStatus([this]() { + WriteAllDataPartsToFullResultTable(); + }); + } + + void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) { + if (CanSendToFullResultWriter()) { + UnsafeWriteToFullResultTable(*data); + return; + } + UpdateEventQueueStatus([this, data]() { + WriteToFullResultTable(data); + }); + } + + void UnsafeWriteToFullResultTable(const NDqProto::TData& data) { + NDqProto::TPullResponse response; + response.SetResponseType(NDqProto::EPullResponseType::CONTINUE); + response.MutableData()->CopyFrom(data); + ui64 respSize = response.ByteSizeLong(); + TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); + FullResultSentBytes += respSize; + } + + protected: + const bool FullResultTableEnabled; + const NActors::TActorId ExecuterID; + const NActors::TActorId GraphExecutionEventsId; + const bool Discard; + TVector<NDqProto::TData> DataParts; + const TString TraceId; + TDqConfiguration::TPtr Settings; + ui64 SizeLimit; + TMaybe<ui64> RowsLimit; + ui64 Rows; + bool Truncated; + NActors::TActorId FullResultWriterID; + THolder<TProtoBuilder> ResultBuilder; + TString ResultYson; + THolder<TCountingOutput> ResultYsonOut; + THolder<NYson::TYsonWriter> ResultYsonWriter; + + ui64 FullResultSentBytes; + ui64 FullResultReceivedBytes; + ui64 FullResultSentDataParts; + + TIssues Issues; + bool FinishCalled; + + THashSet<NActors::TActorId> BlockingActors; + THolder<TEvQueryResponse> QueryResponse; + }; +} // namespace NYql::NDqs::NExecutionHelpers diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index 59ae95c3af5..44ab1451d91 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -1,14 +1,11 @@ #include "result_aggregator.h" #include "result_receiver.h" -#include "proto_builder.h" -#include "full_result_writer.h" -#include <ydb/library/yql/providers/dq/actors/actor_helpers.h> #include <ydb/library/yql/providers/dq/actors/events.h> #include <ydb/library/yql/providers/dq/actors/executer_actor.h> +#include <ydb/library/yql/providers/dq/actors/result_actor_base.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> -#include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> @@ -32,9 +29,7 @@ #include <util/generic/ptr.h> #include <util/string/split.h> #include <util/system/types.h> -#include <util/stream/holder.h> #include <util/stream/str.h> -#include <util/stream/length.h> namespace NYql::NDqs::NExecutionHelpers { @@ -54,7 +49,7 @@ using namespace NActors; namespace { -class TResultAggregator: public TSynchronizableRichActor<TResultAggregator>, NYql::TCounters { +class TResultAggregator: public TResultActorBase<TResultAggregator> { static constexpr ui32 MAX_RESULT_BATCH = 2048; public: @@ -62,33 +57,15 @@ public: explicit TResultAggregator(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const TString& resultType, NActors::TActorId graphExecutionEventsId, bool discard) - : TSynchronizableRichActor<TResultAggregator>(&TResultAggregator::Handler) - , ExecuterID(executerId) - , GraphExecutionEventsId(graphExecutionEventsId) - , Discard(discard) - , TraceId(traceId) - , Settings(settings) - , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns)) - , ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson))) - , ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true)) - { - ResultYsonWriter->OnBeginList(); + : TResultActorBase<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) { if (Settings) { PullRequestTimeout = TDuration::MilliSeconds(settings->PullRequestTimeoutMs.Get().GetOrElse(0)); PingTimeout = TDuration::MilliSeconds(settings->PingTimeoutMs.Get().GetOrElse(0)); PingPeriod = Max(PingTimeout/4, TDuration::MilliSeconds(1000)); - - SizeLimit = Settings->_AllResultsBytesLimit.Get().GetOrElse(64000000); - YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit; - - if (Settings->_RowsLimitPerWrite.Get()) { - YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); - RowsLimit = Settings->_RowsLimitPerWrite.Get(); - } } } -private: +public: #define HANDLER_STUB(TEvType) \ cFunc(TEvType::EventType, [this]() { \ YQL_LOG_CTX_SCOPE(TraceId); \ @@ -103,43 +80,12 @@ private: HFunc(TEvQueryResponse, OnQueryResult); HFunc(TEvDqFailure, OnFullResultWriterResponse); cFunc(TEvents::TEvPoison::EventType, PassAway) - hFunc(TEvents::TEvUndelivered, [this] (TEvents::TEvUndelivered::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(SelfId()) - + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) - + "." + ToString(ev->Get()->SourceType & 0xFFFF); - OnError(message, true, true); - }) + hFunc(TEvents::TEvUndelivered, OnUndelivered) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown) }) - - STRICT_STFUNC(ShutdownHandler, { - HFunc(TEvents::TEvGone, OnShutdownQueryResult); - HANDLER_STUB(TEvPullResult) - HANDLER_STUB(TEvReadyState) - HANDLER_STUB(TEvPullDataResponse) - HANDLER_STUB(TEvPingResponse) - HANDLER_STUB(TEvQueryResponse) - HANDLER_STUB(TEvDqFailure) - cFunc(TEvents::TEvPoison::EventType, PassAway); - HANDLER_STUB(TEvents::TEvUndelivered) - HANDLER_STUB(TEvents::TEvWakeup) - }) - - void DoPassAway() override { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; - } - - void OnFullResultWriterShutdown() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Got TEvGone"; - - FullResultWriterID = {}; - } - +private: void OnWakeup() { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; @@ -183,22 +129,13 @@ private: Send(SourceID, MakeHolder<TEvPullDataRequest>(MAX_RESULT_BATCH), IEventHandle::FlagTrackDelivery); } - void OnError(const TString& message, bool retriable, bool needFallback) { - YQL_LOG(ERROR) << "OnError " << message; - auto issueCode = needFallback - ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR - : TIssuesIds::DQ_GATEWAY_ERROR; - auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback); - FlushCounters(req->Record); - Send(ExecuterID, req.Release()); - } - void OnPingResponse(TEvPingResponse::TPtr&, const TActorContext&) { PingRequested = false; } void OnPullResponse(TEvPullDataResponse::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; if (FinishCalled) { // finalization has been begun, actor will not kill himself anymore, should ignore responses instead @@ -214,7 +151,7 @@ private: Send(SelfId(), MakeHolder<TEvPullResult>()); } break; case NYql::NDqProto::FINISH: - Finish(Truncated); + Finish(); return; case NYql::NDqProto::YIELD: Schedule(TDuration::MilliSeconds(10), new TEvPullResult()); @@ -232,233 +169,10 @@ private: break; } - if (!Discard) { - auto fullResultTableEnabled = Settings->EnableFullResultWrite.Get().GetOrElse(false); - - if (fullResultTableEnabled && Truncated) { - WriteToFullResultTable(new NDqProto::TData(std::move(*response.MutableData()))); - } else { - DataParts.emplace_back(std::move(*response.MutableData())); - - bool full = true; - bool exceedRows = false; - try { - full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) { - if (RowsLimit && Rows + 1 > *RowsLimit) { - exceedRows = true; - return false; - } else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) { - return false; - } - ResultYsonWriter->OnListItem(); - ResultYsonWriter->OnRaw(rawYson); - ++Rows; - return true; - }); - } catch (...) { - OnError(CurrentExceptionMessage(), false, false); - return; - } - - if (!full) { - if (fullResultTableEnabled) { - FlushCurrent(); - } else { - TString issueMsg; - if (exceedRows) { - issueMsg = TStringBuilder() << "Rows limit reached: " << *RowsLimit; - } else { - issueMsg = TStringBuilder() << "Size limit reached: " << SizeLimit; - } - TIssue issue(issueMsg); - issue.Severity = TSeverityIds::S_WARNING; - Finish(/*truncated = */ true, {issue}); - } - } - } - } - } - - void FlushCurrent() { - YQL_LOG(DEBUG) << __FUNCTION__; - YQL_ENSURE(!Truncated); - YQL_ENSURE(!FullResultWriterID); - YQL_ENSURE(Settings->EnableFullResultWrite.Get().GetOrElse(false)); - - NDqProto::TGraphExecutionEvent record; - record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT); - NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload; - payload.SetResultType(ResultBuilder->GetSerializedType()); - record.MutableMessage()->PackFrom(payload); - Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record)); - Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) { - Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); - YQL_LOG_CTX_SCOPE(TraceId); - - if (auto msg = ev->Get()->Record.GetErrorMessage()) { - OnError(msg, false, true); - } else { - NActorsProto::TActorId fullResultWriterProto; - ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto); - FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto); - Truncated = true; - WriteAllDataPartsToFullResultTable(); - } - }); - } - - bool CanSendToFullResultWriter() { - // TODO Customize - return FullResultSentBytes - FullResultReceivedBytes <= 32_MB; - } - - template <class TCallback> - void UpdateEventQueueStatus(TCallback callback) { - YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB " - << " received " << (FullResultReceivedBytes / 1024.0) << " kB " - << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; - Send(FullResultWriterID, new TEvFullResultWriterStatusRequest()); - Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived(); - YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB " - << " received " << (FullResultReceivedBytes / 1024.0) << " kB " - << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB"; - if (ev->Get()->Record.HasErrorMessage()) { - YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage(); - OnError(ev->Get()->Record.GetErrorMessage(), false, false); - return; - } - callback(); - }); - } - - void WriteAllDataPartsToFullResultTable() { - while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) { - UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]); - DataParts[FullResultSentDataParts].Clear(); - ++FullResultSentDataParts; - } - if (FullResultSentDataParts == DataParts.size()) { - return; - } - // here we cannot continue since the event queue is overloaded - // kind of tail recursion (but without recursion) - UpdateEventQueueStatus([this]() { - WriteAllDataPartsToFullResultTable(); - }); - } - - void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) { - if (CanSendToFullResultWriter()) { - UnsafeWriteToFullResultTable(*data); - return; - } - UpdateEventQueueStatus([this, data]() { - WriteToFullResultTable(data); - }); - } - - void UnsafeWriteToFullResultTable(const NDqProto::TData& data) { - NDqProto::TPullResponse response; - response.SetResponseType(EPullResponseType::CONTINUE); - response.MutableData()->CopyFrom(data); - ui64 respSize = response.ByteSizeLong(); - Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); - FullResultSentBytes += respSize; - } - - void Finish(bool truncated = false, const TIssues& issues = {}) { - YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << truncated; - YQL_ENSURE(!FinishCalled); - FinishCalled = true; - FinishTruncated = truncated; - FinishIssues = issues; - if (FullResultWriterID) { - NDqProto::TPullResponse response; - response.SetResponseType(EPullResponseType::FINISH); - Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response)); - } else { - DoFinish(); - } - } - - void DoFinish() { - Send(ExecuterID, new TEvGraphFinished()); - } - - void OnFullResultWriterResponse(TEvDqFailure::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; - if (ev->Get()->Record.IssuesSize() == 0) { - DoFinish(); - } else { - Send(ExecuterID, ev->Release().Release()); - } - } - - void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); - YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; - - BlockingActors.clear(); - if (FullResultWriterID) { - BlockingActors.insert(FullResultWriterID); - Send(FullResultWriterID, MakeHolder<TEvents::TEvPoison>()); - } - - YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; - - QueryResponse.Reset(ev->Release().Release()); - Become(&TResultAggregator::ShutdownHandler); - Send(SelfId(), MakeHolder<TEvents::TEvGone>()); - } - - void OnShutdownQueryResult(TEvents::TEvGone::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - auto iter = BlockingActors.find(ev->Sender); - if (iter != BlockingActors.end()) { - BlockingActors.erase(iter); - } - - YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left"; - - if (BlockingActors.empty()) { - EndOnQueryResult(); - } - } - - void EndOnQueryResult() { - YQL_LOG(DEBUG) << __FUNCTION__; - NDqProto::TQueryResponse result = QueryResponse->Record; - - YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); - FlushCounters(result); - - if (ResultYsonWriter) { - ResultYsonWriter->OnEndList(); - ResultYsonWriter.Destroy(); - } - ResultYsonOut.Destroy(); - - *result.MutableYson() = ResultYson; - - if (!FinishIssues.Empty()) { - IssuesToMessage(FinishIssues, result.MutableIssues()); - } - result.SetTruncated(FinishTruncated); - - Send(ExecuterID, new TEvQueryResponse(std::move(result))); + OnReceiveData(std::move(*response.MutableData())); } - const NActors::TActorId ExecuterID; NActors::TActorId SourceID; - const NActors::TActorId GraphExecutionEventsId; - const bool Discard; - TVector<NDqProto::TData> DataParts; - const TString TraceId; - TDqConfiguration::TPtr Settings; TDuration PullRequestTimeout; TDuration PingTimeout; TDuration PingPeriod; @@ -466,25 +180,6 @@ private: TInstant PullRequestStartTime; bool PingRequested = false; NActors::TSchedulerCookieHolder TimerCookieHolder; - ui64 SizeLimit = 64000000; // GRPC limit - TMaybe<ui64> RowsLimit; - ui64 Rows = 0; - bool Truncated = false; - NActors::TActorId FullResultWriterID; - THolder<TProtoBuilder> ResultBuilder; - TString ResultYson; - THolder<TCountingOutput> ResultYsonOut; - THolder<NYson::TYsonWriter> ResultYsonWriter; - ui64 FullResultSentBytes = 0; - ui64 FullResultReceivedBytes = 0; - ui64 FullResultSentDataParts = 0; - - TIssues FinishIssues; - bool FinishTruncated = false; - bool FinishCalled = false; - - THashSet<TActorId> BlockingActors; - THolder<TEvQueryResponse> QueryResponse; }; class TResultPrinter: public TActor<TResultPrinter> { @@ -544,7 +239,7 @@ THolder<NActors::IActor> MakeResultAggregator( result = MakeHolder<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard); } else { // compute actor push - result = NYql::MakeResultReceiver(columns, executerId, traceId, settings, secureParams, resultType, discard); + result = NYql::MakeResultReceiver(columns, executerId, traceId, settings, secureParams, resultType, graphExecutionEventsId, discard); } return MakeHolder<TLogWrapReceive>(result.Release(), traceId); } diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp index 7fdb73f257b..2e0a7bfbddc 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp +++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp @@ -3,12 +3,10 @@ #include <ydb/library/yql/providers/dq/actors/execution_helpers.h> #include <ydb/library/yql/providers/dq/actors/events.h> +#include <ydb/library/yql/providers/dq/actors/result_actor_base.h> -#include <ydb/library/yql/providers/dq/actors/actor_helpers.h> #include <ydb/library/yql/providers/dq/actors/executer_actor.h> -#include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> -#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> @@ -34,77 +32,45 @@ using namespace NDqs; namespace { -class TResultReceiver: public TRichActor<TResultReceiver> { +class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TResultReceiver> { public: static constexpr char ActorName[] = "YQL_DQ_RESULT_RECEIVER"; explicit TResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, - const THashMap<TString, TString>& secureParams, const TString& resultType, bool discard) - : TRichActor<TResultReceiver>(&TResultReceiver::Handler) - , ExecuterId(executerId) - , TraceId(traceId) - , Settings(settings) - , SecureParams(std::move(secureParams)) - , ResultBuilder( - resultType - ? MakeHolder<TProtoBuilder>(resultType, columns) - : nullptr) - , Discard(discard) - { - if (Settings) { - if (Settings->_AllResultsBytesLimit.Get()) { - YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); - } - if (Settings->_RowsLimitPerWrite.Get()) { - YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); - } - } - - Y_UNUSED(Size); - Y_UNUSED(Rows); + const THashMap<TString, TString>& /*secureParams*/, const TString& resultType, const NActors::TActorId& graphExecutionEventsId, bool discard) + : TResultActorBase<TResultReceiver>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) { } -private: +public: STRICT_STFUNC(Handler, { HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData) HFunc(TEvReadyState, OnReadyState); HFunc(TEvQueryResponse, OnQueryResult); - cFunc(TEvents::TEvPoison::EventType, PassAway) + cFunc(TEvents::TEvPoison::EventType, PassAway); + HFunc(TEvDqFailure, OnFullResultWriterResponse); + hFunc(TEvents::TEvUndelivered, OnUndelivered); + cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown); + sFunc(TEvResultReceiverFinish, Finish); }) +private: void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << __FUNCTION__; - auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); - - if (!Discard) { - if (!Finished && ev->Get()->Record.GetChannelData().GetData().GetRaw().size() > 0) { - DataParts.emplace_back(std::move(ev->Get()->Record.GetChannelData().GetData())); - Size += DataParts.back().GetRaw().size(); - Rows += DataParts.back().GetRows(); - YQL_LOG(DEBUG) << "Size: " << Size; - YQL_LOG(DEBUG) << "Rows: " << Rows; - } - - if (Size > 64000000 /* grpc limit*/) { - OnError("Too big result (grpc limit reached: " + ToString(Size) + " > 64000000)" , false, true); - } else if (Settings && Settings->_AllResultsBytesLimit.Get() && Size > *Settings->_AllResultsBytesLimit.Get()) { - TIssue issue("Size limit reached: " + ToString(Size) + ">" + ToString(Settings->_AllResultsBytesLimit.Get())); - issue.Severity = TSeverityIds::S_WARNING; - Issues.AddIssue(issue); - Finish(/*truncated = */ true); - } else if (Settings && Settings->_RowsLimitPerWrite.Get() && Rows > *Settings->_RowsLimitPerWrite.Get()) { - TIssue issue("Rows limit reached: " + ToString(Rows) + ">" + ToString(Settings->_RowsLimitPerWrite.Get())); - issue.Severity = TSeverityIds::S_WARNING; - Issues.AddIssue(issue); - Finish(/*truncated = */ true); - } + if (!FinishCalled) { + OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData())); + } + if (!FinishCalled && ev->Get()->Record.GetChannelData().GetFinished()) { + Send(SelfId(), MakeHolder<TEvResultReceiverFinish>()); // postpone finish until TFullResultWriterActor is instaniated } + // todo: ack after data is stored to yt? + auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId()); res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); res->Record.SetFreeSpace(256_MB); - res->Record.SetFinish(Finished); + res->Record.SetFinish(FinishCalled); // set if premature finish started (when response limit reached and FullResultTable not enabled) Send(ev->Sender, res.Release()); YQL_LOG(DEBUG) << "Finished: " << ev->Get()->Record.GetChannelData().GetFinished(); @@ -113,70 +79,20 @@ private: void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { // do nothing } - - void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { - NDqProto::TQueryResponse result(ev->Get()->Record); - YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); - - if (ResultBuilder) { - try { - TString yson = Discard ? "" : ResultBuilder->BuildYson( - DataParts, - Settings && Settings->_AllResultsBytesLimit.Get() - ? *Settings->_AllResultsBytesLimit.Get() - : 64000000 /* grpc limit*/); - *result.MutableYson() = yson; - } catch (...) { - Issues.AddIssue(TIssue(CurrentExceptionMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)); - result.SetNeedFallback(true); - } - } else { - if (Rows > 0) { - Issues.AddIssue(TIssue("Non empty rows: " + ToString(Rows)).SetCode(0, TSeverityIds::S_WARNING)); - } - } - - if (!Issues.Empty()) { - IssuesToMessage(Issues, result.MutableIssues()); - } - result.SetTruncated(Truncated); - Send(ExecuterId, new TEvQueryResponse(std::move(result))); - } - - void OnError(const TString& message, bool retriable, bool needFallback) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "OnError " << message; - auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(-1, TSeverityIds::S_ERROR), retriable, needFallback); - Send(ExecuterId, req.Release()); - Finished = true; - } - - void Finish(bool truncated = false) { - Send(ExecuterId, new TEvGraphFinished()); - Finished = true; - Truncated = truncated; - } - - const NActors::TActorId ExecuterId; - TVector<NDqProto::TData> DataParts; - const TString TraceId; - TDqConfiguration::TPtr Settings; - bool Finished = false; - bool Truncated = false; - TIssues Issues; - ui64 Size = 0; - ui64 Rows = 0; - // const Yql::DqsProto::TFullResultTable FullResultTable; - const THashMap<TString, TString> SecureParams; - // THolder<IFullResultWriter> FullResultWriter; - THolder<TProtoBuilder> ResultBuilder; - bool Discard = false; }; } /* namespace */ -THolder<NActors::IActor> MakeResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const THashMap<TString, TString>& secureParams, const TString& resultType, bool discard) { - return MakeHolder<TResultReceiver>(columns, executerId, traceId, settings, secureParams, resultType, discard); +THolder<NActors::IActor> MakeResultReceiver( + const TVector<TString>& columns, + const NActors::TActorId& executerId, + const TString& traceId, + const TDqConfiguration::TPtr& settings, + const THashMap<TString, TString>& secureParams, + const TString& resultType, + const NActors::TActorId& graphExecutionEventsId, + bool discard) { + return MakeHolder<TResultReceiver>(columns, executerId, traceId, settings, secureParams, resultType, graphExecutionEventsId, discard); } } /* namespace NYql */ diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.h b/ydb/library/yql/providers/dq/actors/result_receiver.h index 968bcede101..e791fa0e224 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.h +++ b/ydb/library/yql/providers/dq/actors/result_receiver.h @@ -15,6 +15,7 @@ THolder<NActors::IActor> MakeResultReceiver( // const Yql::DqsProto::TFullResultTable& resultTable, const THashMap<TString, TString>& secureParams, const TString& resultBuilder, + const NActors::TActorId& graphExecutionEventsId, bool discard ); |