aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-02-14 18:43:37 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-02-14 18:43:37 +0300
commitc05a3101b2209fff1587bf4d0df33f28d94d76bd (patch)
treee350f3f38505774f4782fb1cb5089d375975dcc4
parent059c787fd720eb6552d2f2584848b33dd42f1ba8 (diff)
downloadydb-c05a3101b2209fff1587bf4d0df33f28d94d76bd.tar.gz
YQL-14355: add fullResultWrite to result_receiver
ref:80445b98140ef840628ec678b30b9fb41932a718
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h2
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h369
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp325
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.cpp144
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.h1
5 files changed, 412 insertions, 429 deletions
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index bd03b264833..7fb39fd1f54 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -92,4 +92,6 @@ namespace NYql::NDqs {
struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> {
DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, "");
};
+
+ using TEvResultReceiverFinish = TEvGraphFinished;
}
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
new file mode 100644
index 00000000000..8db8566e4f7
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -0,0 +1,369 @@
+#pragma once
+
+#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
+#include <ydb/library/yql/providers/dq/actors/events.h>
+#include <ydb/library/yql/providers/dq/actors/proto_builder.h>
+#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
+#include <ydb/library/yql/providers/dq/counters/counters.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/library/yql/utils/failure_injector/failure_injector.h>
+
+#include <util/stream/holder.h>
+#include <util/stream/length.h>
+
+namespace NYql::NDqs::NExecutionHelpers {
+
+ template <class TDerived>
+ class TResultActorBase : public NYql::TSynchronizableRichActor<TDerived>, public NYql::TCounters {
+ protected:
+ using TBase = NYql::TSynchronizableRichActor<TDerived>;
+
+ TResultActorBase(
+ const TVector<TString>& columns,
+ const NActors::TActorId& executerId,
+ const TString& traceId,
+ const TDqConfiguration::TPtr& settings,
+ const TString& resultType,
+ NActors::TActorId graphExecutionEventsId,
+ bool discard)
+ : TBase(&TDerived::Handler)
+ , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false))
+ , ExecuterID(executerId)
+ , GraphExecutionEventsId(graphExecutionEventsId)
+ , Discard(discard)
+ , DataParts()
+ , TraceId(traceId)
+ , Settings(settings)
+ , SizeLimit(
+ (Settings && Settings->_AllResultsBytesLimit.Get().Defined())
+ ? Settings->_AllResultsBytesLimit.Get().GetRef()
+ : 64000000) // GRPC limit
+ , RowsLimit(settings ? Settings->_RowsLimitPerWrite.Get() : Nothing())
+ , Rows(0)
+ , Truncated(false)
+ , FullResultWriterID()
+ , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns))
+ , ResultYson()
+ , ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson)))
+ , ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true))
+ , FullResultSentBytes(0)
+ , FullResultReceivedBytes(0)
+ , FullResultSentDataParts(0)
+ , Issues()
+ , FinishCalled(false)
+ , BlockingActors()
+ , QueryResponse() {
+ ResultYsonWriter->OnBeginList();
+ YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit;
+ YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing");
+ }
+
+ void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
+ YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
+
+ BlockingActors.clear();
+ if (FullResultWriterID) {
+ BlockingActors.insert(FullResultWriterID);
+ TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>());
+ }
+
+ YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
+
+ QueryResponse.Reset(ev->Release().Release());
+ TBase::Become(&TResultActorBase::ShutdownHandler);
+ TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>());
+ }
+
+ void OnReceiveData(NYql::NDqProto::TData&& data) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+
+ if (Discard) {
+ return;
+ }
+
+ if (FullResultTableEnabled && FullResultWriterID) {
+ WriteToFullResultTable(MakeHolder<NDqProto::TData>(data));
+ } else {
+ DataParts.emplace_back(data); // todo: seems like DataParts stores at most 1 element (replace with holder)
+
+ bool full = true;
+ bool exceedRows = false;
+ try {
+ TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; });
+ full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) {
+ if (RowsLimit && Rows + 1 > *RowsLimit) {
+ exceedRows = true;
+ return false;
+ } else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) {
+ return false;
+ }
+ ResultYsonWriter->OnListItem();
+ ResultYsonWriter->OnRaw(rawYson);
+ ++Rows;
+ return true;
+ });
+ } catch (...) {
+ OnError(CurrentExceptionMessage(), false, true);
+ return;
+ }
+
+ if (full) {
+ return;
+ }
+
+ Truncated = true;
+ if (FullResultTableEnabled) {
+ FlushCurrent();
+ } else {
+ TString issueMsg;
+ if (exceedRows) {
+ issueMsg = TStringBuilder() << "Rows limit reached: " << *RowsLimit;
+ } else {
+ issueMsg = TStringBuilder() << "Size limit reached: " << SizeLimit;
+ }
+ TIssue issue(issueMsg);
+ issue.Severity = TSeverityIds::S_WARNING;
+ Issues.AddIssues({issue});
+ Finish();
+ }
+ }
+ }
+
+ void OnFullResultWriterShutdown() {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << "Got TEvGone";
+
+ FullResultWriterID = {};
+ }
+
+ void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ if (ev->Get()->Record.IssuesSize() == 0) {
+ DoFinish();
+ } else {
+ TBase::Send(ExecuterID, ev->Release().Release());
+ }
+ }
+
+ void OnError(const TString& message, bool retriable, bool needFallback) {
+ YQL_LOG(ERROR) << "OnError " << message;
+ auto issueCode = needFallback
+ ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
+ : TIssuesIds::DQ_GATEWAY_ERROR;
+ const auto issue = TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR);
+ Issues.AddIssues({issue}); // remember issue to pass it with TEvQueryResponse, cause executor_actor ignores TEvDqFailure after finish
+ auto req = MakeHolder<TEvDqFailure>(issue, retriable, needFallback);
+ FlushCounters(req->Record);
+ TBase::Send(ExecuterID, req.Release());
+ }
+
+ void Finish() {
+ YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << Truncated;
+ YQL_ENSURE(!FinishCalled);
+ FinishCalled = true;
+
+ if (FullResultWriterID) {
+ NDqProto::TPullResponse response;
+ response.SetResponseType(NDqProto::EPullResponseType::FINISH);
+ TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
+ } else {
+ DoFinish();
+ }
+ }
+
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId())
+ + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
+ + "." + ToString(ev->Get()->SourceType & 0xFFFF);
+ OnError(message, true, true);
+ }
+
+ private:
+ STFUNC(ShutdownHandler) {
+ Y_UNUSED(ctx);
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ HFunc(NActors::TEvents::TEvGone, OnShutdownQueryResult);
+ cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway);
+ HFunc(TEvDqFailure, OnErrorInShutdownState);
+ default:
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << "Unexpected event " << etype;
+ break;
+ }
+ }
+
+ private:
+ void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
+ // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse
+ TIssues issues;
+ IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
+ Issues.AddIssues(issues);
+ }
+
+ void OnShutdownQueryResult(NActors::TEvents::TEvGone::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ auto iter = BlockingActors.find(ev->Sender);
+ if (iter != BlockingActors.end()) {
+ BlockingActors.erase(iter);
+ }
+
+ YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left";
+
+ if (BlockingActors.empty()) {
+ EndOnQueryResult();
+ }
+ }
+
+ void DoFinish() {
+ TBase::Send(ExecuterID, new TEvGraphFinished());
+ }
+
+ void FlushCurrent() {
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_ENSURE(!FullResultWriterID);
+ YQL_ENSURE(FullResultTableEnabled);
+
+ NDqProto::TGraphExecutionEvent record;
+ record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT);
+ NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload;
+ payload.SetResultType(ResultBuilder->GetSerializedType());
+ record.MutableMessage()->PackFrom(payload);
+ TBase::Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record));
+ TBase::template Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) {
+ Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
+ YQL_LOG_CTX_SCOPE(TraceId);
+
+ if (auto msg = ev->Get()->Record.GetErrorMessage()) {
+ OnError(msg, false, true);
+ } else {
+ NActorsProto::TActorId fullResultWriterProto;
+ ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto);
+ FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto);
+ WriteAllDataPartsToFullResultTable();
+ }
+ });
+ }
+
+ void EndOnQueryResult() {
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ NDqProto::TQueryResponse result = QueryResponse->Record;
+
+ YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
+ FlushCounters(result);
+
+ if (ResultYsonWriter) {
+ ResultYsonWriter->OnEndList();
+ ResultYsonWriter.Destroy();
+ }
+ ResultYsonOut.Destroy();
+
+ *result.MutableYson() = ResultYson;
+
+ if (!Issues.Empty()) {
+ NYql::IssuesToMessage(Issues, result.MutableIssues());
+ }
+ result.SetTruncated(Truncated);
+
+ TBase::Send(ExecuterID, new TEvQueryResponse(std::move(result)));
+ }
+
+ void DoPassAway() override {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ }
+
+ bool CanSendToFullResultWriter() {
+ // TODO Customize
+ return FullResultSentBytes - FullResultReceivedBytes <= 32_MB;
+ }
+
+ template <class TCallback>
+ void UpdateEventQueueStatus(TCallback callback) {
+ YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB "
+ << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
+ << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
+ TBase::Send(FullResultWriterID, new TEvFullResultWriterStatusRequest());
+ TBase::template Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived();
+ YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB "
+ << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
+ << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
+ if (ev->Get()->Record.HasErrorMessage()) {
+ YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage();
+ OnError(ev->Get()->Record.GetErrorMessage(), false, false);
+ return;
+ }
+ callback();
+ });
+ }
+
+ void WriteAllDataPartsToFullResultTable() {
+ while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) {
+ UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]);
+ DataParts[FullResultSentDataParts].Clear();
+ ++FullResultSentDataParts;
+ }
+ if (FullResultSentDataParts == DataParts.size()) {
+ return;
+ }
+ // here we cannot continue since the event queue is overloaded
+ // kind of tail recursion (but without recursion)
+ UpdateEventQueueStatus([this]() {
+ WriteAllDataPartsToFullResultTable();
+ });
+ }
+
+ void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) {
+ if (CanSendToFullResultWriter()) {
+ UnsafeWriteToFullResultTable(*data);
+ return;
+ }
+ UpdateEventQueueStatus([this, data]() {
+ WriteToFullResultTable(data);
+ });
+ }
+
+ void UnsafeWriteToFullResultTable(const NDqProto::TData& data) {
+ NDqProto::TPullResponse response;
+ response.SetResponseType(NDqProto::EPullResponseType::CONTINUE);
+ response.MutableData()->CopyFrom(data);
+ ui64 respSize = response.ByteSizeLong();
+ TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
+ FullResultSentBytes += respSize;
+ }
+
+ protected:
+ const bool FullResultTableEnabled;
+ const NActors::TActorId ExecuterID;
+ const NActors::TActorId GraphExecutionEventsId;
+ const bool Discard;
+ TVector<NDqProto::TData> DataParts;
+ const TString TraceId;
+ TDqConfiguration::TPtr Settings;
+ ui64 SizeLimit;
+ TMaybe<ui64> RowsLimit;
+ ui64 Rows;
+ bool Truncated;
+ NActors::TActorId FullResultWriterID;
+ THolder<TProtoBuilder> ResultBuilder;
+ TString ResultYson;
+ THolder<TCountingOutput> ResultYsonOut;
+ THolder<NYson::TYsonWriter> ResultYsonWriter;
+
+ ui64 FullResultSentBytes;
+ ui64 FullResultReceivedBytes;
+ ui64 FullResultSentDataParts;
+
+ TIssues Issues;
+ bool FinishCalled;
+
+ THashSet<NActors::TActorId> BlockingActors;
+ THolder<TEvQueryResponse> QueryResponse;
+ };
+} // namespace NYql::NDqs::NExecutionHelpers
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index 59ae95c3af5..44ab1451d91 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -1,14 +1,11 @@
#include "result_aggregator.h"
#include "result_receiver.h"
-#include "proto_builder.h"
-#include "full_result_writer.h"
-#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
#include <ydb/library/yql/providers/dq/actors/executer_actor.h>
+#include <ydb/library/yql/providers/dq/actors/result_actor_base.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
-#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
@@ -32,9 +29,7 @@
#include <util/generic/ptr.h>
#include <util/string/split.h>
#include <util/system/types.h>
-#include <util/stream/holder.h>
#include <util/stream/str.h>
-#include <util/stream/length.h>
namespace NYql::NDqs::NExecutionHelpers {
@@ -54,7 +49,7 @@ using namespace NActors;
namespace {
-class TResultAggregator: public TSynchronizableRichActor<TResultAggregator>, NYql::TCounters {
+class TResultAggregator: public TResultActorBase<TResultAggregator> {
static constexpr ui32 MAX_RESULT_BATCH = 2048;
public:
@@ -62,33 +57,15 @@ public:
explicit TResultAggregator(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId,
const TDqConfiguration::TPtr& settings, const TString& resultType, NActors::TActorId graphExecutionEventsId, bool discard)
- : TSynchronizableRichActor<TResultAggregator>(&TResultAggregator::Handler)
- , ExecuterID(executerId)
- , GraphExecutionEventsId(graphExecutionEventsId)
- , Discard(discard)
- , TraceId(traceId)
- , Settings(settings)
- , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns))
- , ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson)))
- , ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true))
- {
- ResultYsonWriter->OnBeginList();
+ : TResultActorBase<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) {
if (Settings) {
PullRequestTimeout = TDuration::MilliSeconds(settings->PullRequestTimeoutMs.Get().GetOrElse(0));
PingTimeout = TDuration::MilliSeconds(settings->PingTimeoutMs.Get().GetOrElse(0));
PingPeriod = Max(PingTimeout/4, TDuration::MilliSeconds(1000));
-
- SizeLimit = Settings->_AllResultsBytesLimit.Get().GetOrElse(64000000);
- YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit;
-
- if (Settings->_RowsLimitPerWrite.Get()) {
- YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
- RowsLimit = Settings->_RowsLimitPerWrite.Get();
- }
}
}
-private:
+public:
#define HANDLER_STUB(TEvType) \
cFunc(TEvType::EventType, [this]() { \
YQL_LOG_CTX_SCOPE(TraceId); \
@@ -103,43 +80,12 @@ private:
HFunc(TEvQueryResponse, OnQueryResult);
HFunc(TEvDqFailure, OnFullResultWriterResponse);
cFunc(TEvents::TEvPoison::EventType, PassAway)
- hFunc(TEvents::TEvUndelivered, [this] (TEvents::TEvUndelivered::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
- TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(SelfId())
- + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
- + "." + ToString(ev->Get()->SourceType & 0xFFFF);
- OnError(message, true, true);
- })
+ hFunc(TEvents::TEvUndelivered, OnUndelivered)
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown)
})
-
- STRICT_STFUNC(ShutdownHandler, {
- HFunc(TEvents::TEvGone, OnShutdownQueryResult);
- HANDLER_STUB(TEvPullResult)
- HANDLER_STUB(TEvReadyState)
- HANDLER_STUB(TEvPullDataResponse)
- HANDLER_STUB(TEvPingResponse)
- HANDLER_STUB(TEvQueryResponse)
- HANDLER_STUB(TEvDqFailure)
- cFunc(TEvents::TEvPoison::EventType, PassAway);
- HANDLER_STUB(TEvents::TEvUndelivered)
- HANDLER_STUB(TEvents::TEvWakeup)
- })
-
- void DoPassAway() override {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
- }
-
- void OnFullResultWriterShutdown() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Got TEvGone";
-
- FullResultWriterID = {};
- }
-
+private:
void OnWakeup() {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << __FUNCTION__;
@@ -183,22 +129,13 @@ private:
Send(SourceID, MakeHolder<TEvPullDataRequest>(MAX_RESULT_BATCH), IEventHandle::FlagTrackDelivery);
}
- void OnError(const TString& message, bool retriable, bool needFallback) {
- YQL_LOG(ERROR) << "OnError " << message;
- auto issueCode = needFallback
- ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
- : TIssuesIds::DQ_GATEWAY_ERROR;
- auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback);
- FlushCounters(req->Record);
- Send(ExecuterID, req.Release());
- }
-
void OnPingResponse(TEvPingResponse::TPtr&, const TActorContext&) {
PingRequested = false;
}
void OnPullResponse(TEvPullDataResponse::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
if (FinishCalled) {
// finalization has been begun, actor will not kill himself anymore, should ignore responses instead
@@ -214,7 +151,7 @@ private:
Send(SelfId(), MakeHolder<TEvPullResult>());
} break;
case NYql::NDqProto::FINISH:
- Finish(Truncated);
+ Finish();
return;
case NYql::NDqProto::YIELD:
Schedule(TDuration::MilliSeconds(10), new TEvPullResult());
@@ -232,233 +169,10 @@ private:
break;
}
- if (!Discard) {
- auto fullResultTableEnabled = Settings->EnableFullResultWrite.Get().GetOrElse(false);
-
- if (fullResultTableEnabled && Truncated) {
- WriteToFullResultTable(new NDqProto::TData(std::move(*response.MutableData())));
- } else {
- DataParts.emplace_back(std::move(*response.MutableData()));
-
- bool full = true;
- bool exceedRows = false;
- try {
- full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) {
- if (RowsLimit && Rows + 1 > *RowsLimit) {
- exceedRows = true;
- return false;
- } else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) {
- return false;
- }
- ResultYsonWriter->OnListItem();
- ResultYsonWriter->OnRaw(rawYson);
- ++Rows;
- return true;
- });
- } catch (...) {
- OnError(CurrentExceptionMessage(), false, false);
- return;
- }
-
- if (!full) {
- if (fullResultTableEnabled) {
- FlushCurrent();
- } else {
- TString issueMsg;
- if (exceedRows) {
- issueMsg = TStringBuilder() << "Rows limit reached: " << *RowsLimit;
- } else {
- issueMsg = TStringBuilder() << "Size limit reached: " << SizeLimit;
- }
- TIssue issue(issueMsg);
- issue.Severity = TSeverityIds::S_WARNING;
- Finish(/*truncated = */ true, {issue});
- }
- }
- }
- }
- }
-
- void FlushCurrent() {
- YQL_LOG(DEBUG) << __FUNCTION__;
- YQL_ENSURE(!Truncated);
- YQL_ENSURE(!FullResultWriterID);
- YQL_ENSURE(Settings->EnableFullResultWrite.Get().GetOrElse(false));
-
- NDqProto::TGraphExecutionEvent record;
- record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT);
- NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload;
- payload.SetResultType(ResultBuilder->GetSerializedType());
- record.MutableMessage()->PackFrom(payload);
- Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record));
- Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) {
- Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
- YQL_LOG_CTX_SCOPE(TraceId);
-
- if (auto msg = ev->Get()->Record.GetErrorMessage()) {
- OnError(msg, false, true);
- } else {
- NActorsProto::TActorId fullResultWriterProto;
- ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto);
- FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto);
- Truncated = true;
- WriteAllDataPartsToFullResultTable();
- }
- });
- }
-
- bool CanSendToFullResultWriter() {
- // TODO Customize
- return FullResultSentBytes - FullResultReceivedBytes <= 32_MB;
- }
-
- template <class TCallback>
- void UpdateEventQueueStatus(TCallback callback) {
- YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB "
- << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
- << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
- Send(FullResultWriterID, new TEvFullResultWriterStatusRequest());
- Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
- this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived();
- YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB "
- << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
- << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
- if (ev->Get()->Record.HasErrorMessage()) {
- YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage();
- OnError(ev->Get()->Record.GetErrorMessage(), false, false);
- return;
- }
- callback();
- });
- }
-
- void WriteAllDataPartsToFullResultTable() {
- while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) {
- UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]);
- DataParts[FullResultSentDataParts].Clear();
- ++FullResultSentDataParts;
- }
- if (FullResultSentDataParts == DataParts.size()) {
- return;
- }
- // here we cannot continue since the event queue is overloaded
- // kind of tail recursion (but without recursion)
- UpdateEventQueueStatus([this]() {
- WriteAllDataPartsToFullResultTable();
- });
- }
-
- void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) {
- if (CanSendToFullResultWriter()) {
- UnsafeWriteToFullResultTable(*data);
- return;
- }
- UpdateEventQueueStatus([this, data]() {
- WriteToFullResultTable(data);
- });
- }
-
- void UnsafeWriteToFullResultTable(const NDqProto::TData& data) {
- NDqProto::TPullResponse response;
- response.SetResponseType(EPullResponseType::CONTINUE);
- response.MutableData()->CopyFrom(data);
- ui64 respSize = response.ByteSizeLong();
- Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
- FullResultSentBytes += respSize;
- }
-
- void Finish(bool truncated = false, const TIssues& issues = {}) {
- YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << truncated;
- YQL_ENSURE(!FinishCalled);
- FinishCalled = true;
- FinishTruncated = truncated;
- FinishIssues = issues;
- if (FullResultWriterID) {
- NDqProto::TPullResponse response;
- response.SetResponseType(EPullResponseType::FINISH);
- Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
- } else {
- DoFinish();
- }
- }
-
- void DoFinish() {
- Send(ExecuterID, new TEvGraphFinished());
- }
-
- void OnFullResultWriterResponse(TEvDqFailure::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
- if (ev->Get()->Record.IssuesSize() == 0) {
- DoFinish();
- } else {
- Send(ExecuterID, ev->Release().Release());
- }
- }
-
- void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
- YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
-
- BlockingActors.clear();
- if (FullResultWriterID) {
- BlockingActors.insert(FullResultWriterID);
- Send(FullResultWriterID, MakeHolder<TEvents::TEvPoison>());
- }
-
- YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
-
- QueryResponse.Reset(ev->Release().Release());
- Become(&TResultAggregator::ShutdownHandler);
- Send(SelfId(), MakeHolder<TEvents::TEvGone>());
- }
-
- void OnShutdownQueryResult(TEvents::TEvGone::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- auto iter = BlockingActors.find(ev->Sender);
- if (iter != BlockingActors.end()) {
- BlockingActors.erase(iter);
- }
-
- YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left";
-
- if (BlockingActors.empty()) {
- EndOnQueryResult();
- }
- }
-
- void EndOnQueryResult() {
- YQL_LOG(DEBUG) << __FUNCTION__;
- NDqProto::TQueryResponse result = QueryResponse->Record;
-
- YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
- FlushCounters(result);
-
- if (ResultYsonWriter) {
- ResultYsonWriter->OnEndList();
- ResultYsonWriter.Destroy();
- }
- ResultYsonOut.Destroy();
-
- *result.MutableYson() = ResultYson;
-
- if (!FinishIssues.Empty()) {
- IssuesToMessage(FinishIssues, result.MutableIssues());
- }
- result.SetTruncated(FinishTruncated);
-
- Send(ExecuterID, new TEvQueryResponse(std::move(result)));
+ OnReceiveData(std::move(*response.MutableData()));
}
- const NActors::TActorId ExecuterID;
NActors::TActorId SourceID;
- const NActors::TActorId GraphExecutionEventsId;
- const bool Discard;
- TVector<NDqProto::TData> DataParts;
- const TString TraceId;
- TDqConfiguration::TPtr Settings;
TDuration PullRequestTimeout;
TDuration PingTimeout;
TDuration PingPeriod;
@@ -466,25 +180,6 @@ private:
TInstant PullRequestStartTime;
bool PingRequested = false;
NActors::TSchedulerCookieHolder TimerCookieHolder;
- ui64 SizeLimit = 64000000; // GRPC limit
- TMaybe<ui64> RowsLimit;
- ui64 Rows = 0;
- bool Truncated = false;
- NActors::TActorId FullResultWriterID;
- THolder<TProtoBuilder> ResultBuilder;
- TString ResultYson;
- THolder<TCountingOutput> ResultYsonOut;
- THolder<NYson::TYsonWriter> ResultYsonWriter;
- ui64 FullResultSentBytes = 0;
- ui64 FullResultReceivedBytes = 0;
- ui64 FullResultSentDataParts = 0;
-
- TIssues FinishIssues;
- bool FinishTruncated = false;
- bool FinishCalled = false;
-
- THashSet<TActorId> BlockingActors;
- THolder<TEvQueryResponse> QueryResponse;
};
class TResultPrinter: public TActor<TResultPrinter> {
@@ -544,7 +239,7 @@ THolder<NActors::IActor> MakeResultAggregator(
result = MakeHolder<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard);
} else {
// compute actor push
- result = NYql::MakeResultReceiver(columns, executerId, traceId, settings, secureParams, resultType, discard);
+ result = NYql::MakeResultReceiver(columns, executerId, traceId, settings, secureParams, resultType, graphExecutionEventsId, discard);
}
return MakeHolder<TLogWrapReceive>(result.Release(), traceId);
}
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
index 7fdb73f257b..2e0a7bfbddc 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
@@ -3,12 +3,10 @@
#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
+#include <ydb/library/yql/providers/dq/actors/result_actor_base.h>
-#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
#include <ydb/library/yql/providers/dq/actors/executer_actor.h>
-#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
-#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
@@ -34,77 +32,45 @@ using namespace NDqs;
namespace {
-class TResultReceiver: public TRichActor<TResultReceiver> {
+class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TResultReceiver> {
public:
static constexpr char ActorName[] = "YQL_DQ_RESULT_RECEIVER";
explicit TResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings,
- const THashMap<TString, TString>& secureParams, const TString& resultType, bool discard)
- : TRichActor<TResultReceiver>(&TResultReceiver::Handler)
- , ExecuterId(executerId)
- , TraceId(traceId)
- , Settings(settings)
- , SecureParams(std::move(secureParams))
- , ResultBuilder(
- resultType
- ? MakeHolder<TProtoBuilder>(resultType, columns)
- : nullptr)
- , Discard(discard)
- {
- if (Settings) {
- if (Settings->_AllResultsBytesLimit.Get()) {
- YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get();
- }
- if (Settings->_RowsLimitPerWrite.Get()) {
- YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
- }
- }
-
- Y_UNUSED(Size);
- Y_UNUSED(Rows);
+ const THashMap<TString, TString>& /*secureParams*/, const TString& resultType, const NActors::TActorId& graphExecutionEventsId, bool discard)
+ : TResultActorBase<TResultReceiver>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) {
}
-private:
+public:
STRICT_STFUNC(Handler, {
HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData)
HFunc(TEvReadyState, OnReadyState);
HFunc(TEvQueryResponse, OnQueryResult);
- cFunc(TEvents::TEvPoison::EventType, PassAway)
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ HFunc(TEvDqFailure, OnFullResultWriterResponse);
+ hFunc(TEvents::TEvUndelivered, OnUndelivered);
+ cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown);
+ sFunc(TEvResultReceiverFinish, Finish);
})
+private:
void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
- auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
-
- if (!Discard) {
- if (!Finished && ev->Get()->Record.GetChannelData().GetData().GetRaw().size() > 0) {
- DataParts.emplace_back(std::move(ev->Get()->Record.GetChannelData().GetData()));
- Size += DataParts.back().GetRaw().size();
- Rows += DataParts.back().GetRows();
- YQL_LOG(DEBUG) << "Size: " << Size;
- YQL_LOG(DEBUG) << "Rows: " << Rows;
- }
-
- if (Size > 64000000 /* grpc limit*/) {
- OnError("Too big result (grpc limit reached: " + ToString(Size) + " > 64000000)" , false, true);
- } else if (Settings && Settings->_AllResultsBytesLimit.Get() && Size > *Settings->_AllResultsBytesLimit.Get()) {
- TIssue issue("Size limit reached: " + ToString(Size) + ">" + ToString(Settings->_AllResultsBytesLimit.Get()));
- issue.Severity = TSeverityIds::S_WARNING;
- Issues.AddIssue(issue);
- Finish(/*truncated = */ true);
- } else if (Settings && Settings->_RowsLimitPerWrite.Get() && Rows > *Settings->_RowsLimitPerWrite.Get()) {
- TIssue issue("Rows limit reached: " + ToString(Rows) + ">" + ToString(Settings->_RowsLimitPerWrite.Get()));
- issue.Severity = TSeverityIds::S_WARNING;
- Issues.AddIssue(issue);
- Finish(/*truncated = */ true);
- }
+ if (!FinishCalled) {
+ OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData()));
+ }
+ if (!FinishCalled && ev->Get()->Record.GetChannelData().GetFinished()) {
+ Send(SelfId(), MakeHolder<TEvResultReceiverFinish>()); // postpone finish until TFullResultWriterActor is instaniated
}
+ // todo: ack after data is stored to yt?
+ auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId());
res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
res->Record.SetFreeSpace(256_MB);
- res->Record.SetFinish(Finished);
+ res->Record.SetFinish(FinishCalled); // set if premature finish started (when response limit reached and FullResultTable not enabled)
Send(ev->Sender, res.Release());
YQL_LOG(DEBUG) << "Finished: " << ev->Get()->Record.GetChannelData().GetFinished();
@@ -113,70 +79,20 @@ private:
void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) {
// do nothing
}
-
- void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) {
- NDqProto::TQueryResponse result(ev->Get()->Record);
- YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
-
- if (ResultBuilder) {
- try {
- TString yson = Discard ? "" : ResultBuilder->BuildYson(
- DataParts,
- Settings && Settings->_AllResultsBytesLimit.Get()
- ? *Settings->_AllResultsBytesLimit.Get()
- : 64000000 /* grpc limit*/);
- *result.MutableYson() = yson;
- } catch (...) {
- Issues.AddIssue(TIssue(CurrentExceptionMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING));
- result.SetNeedFallback(true);
- }
- } else {
- if (Rows > 0) {
- Issues.AddIssue(TIssue("Non empty rows: " + ToString(Rows)).SetCode(0, TSeverityIds::S_WARNING));
- }
- }
-
- if (!Issues.Empty()) {
- IssuesToMessage(Issues, result.MutableIssues());
- }
- result.SetTruncated(Truncated);
- Send(ExecuterId, new TEvQueryResponse(std::move(result)));
- }
-
- void OnError(const TString& message, bool retriable, bool needFallback) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "OnError " << message;
- auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(-1, TSeverityIds::S_ERROR), retriable, needFallback);
- Send(ExecuterId, req.Release());
- Finished = true;
- }
-
- void Finish(bool truncated = false) {
- Send(ExecuterId, new TEvGraphFinished());
- Finished = true;
- Truncated = truncated;
- }
-
- const NActors::TActorId ExecuterId;
- TVector<NDqProto::TData> DataParts;
- const TString TraceId;
- TDqConfiguration::TPtr Settings;
- bool Finished = false;
- bool Truncated = false;
- TIssues Issues;
- ui64 Size = 0;
- ui64 Rows = 0;
- // const Yql::DqsProto::TFullResultTable FullResultTable;
- const THashMap<TString, TString> SecureParams;
- // THolder<IFullResultWriter> FullResultWriter;
- THolder<TProtoBuilder> ResultBuilder;
- bool Discard = false;
};
} /* namespace */
-THolder<NActors::IActor> MakeResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const THashMap<TString, TString>& secureParams, const TString& resultType, bool discard) {
- return MakeHolder<TResultReceiver>(columns, executerId, traceId, settings, secureParams, resultType, discard);
+THolder<NActors::IActor> MakeResultReceiver(
+ const TVector<TString>& columns,
+ const NActors::TActorId& executerId,
+ const TString& traceId,
+ const TDqConfiguration::TPtr& settings,
+ const THashMap<TString, TString>& secureParams,
+ const TString& resultType,
+ const NActors::TActorId& graphExecutionEventsId,
+ bool discard) {
+ return MakeHolder<TResultReceiver>(columns, executerId, traceId, settings, secureParams, resultType, graphExecutionEventsId, discard);
}
} /* namespace NYql */
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.h b/ydb/library/yql/providers/dq/actors/result_receiver.h
index 968bcede101..e791fa0e224 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.h
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.h
@@ -15,6 +15,7 @@ THolder<NActors::IActor> MakeResultReceiver(
// const Yql::DqsProto::TFullResultTable& resultTable,
const THashMap<TString, TString>& secureParams,
const TString& resultBuilder,
+ const NActors::TActorId& graphExecutionEventsId,
bool discard
);