aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-02-18 18:08:15 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-02-18 18:08:15 +0300
commit5ac55b01de51e962878b17d5f18ad48afc8c1659 (patch)
tree04db210fc36b51be42218511f30d365a38fdbb58
parent47a67a837880704fa43a9f06152f6e0127cc285b (diff)
downloadydb-5ac55b01de51e962878b17d5f18ad48afc8c1659.tar.gz
YQL-14355: ack message after it's written to yt
ref:2dd6f865d9fa5baec788499aaf0437dcbac0b902
-rw-r--r--ydb/library/yql/dq/common/dq_common.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp15
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h25
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp44
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h262
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp59
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.cpp88
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto12
8 files changed, 312 insertions, 196 deletions
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h
index 71cc7be6cb..c60a5b1f4e 100644
--- a/ydb/library/yql/dq/common/dq_common.h
+++ b/ydb/library/yql/dq/common/dq_common.h
@@ -74,6 +74,9 @@ struct TBaseDqDataEvents {
ES_FULL_RESULT_WRITER_STATUS_REQUEST,
ES_FULL_RESULT_WRITER_STATUS_RESPONSE,
+ ES_FULL_RESULT_WRITER_WRITE_REQUEST,
+ ES_FULL_RESULT_WRITER_ACK,
+ ES_MESSAGE_PROCESSED,
};
};
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 5ebe75db67..7095124442 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -18,6 +18,10 @@ namespace NYql::NDqs {
{
}
+ TEvDqFailure::TEvDqFailure(const TString& error, TIssueCode issueCode, bool retriable, bool needFallback)
+ : TEvDqFailure(TIssue(error).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback) {
+ }
+
TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
Record = std::move(queryResult);
}
@@ -56,4 +60,15 @@ namespace NYql::NDqs {
TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) {
Record.CopyFrom(data);
}
+
+ TEvFullResultWriterWriteRequest::TEvFullResultWriterWriteRequest(NDqProto::TFullResultWriterWriteRequest&& data) {
+ Record.Swap(&data);
+ }
+
+ TEvFullResultWriterAck::TEvFullResultWriterAck(NDqProto::TFullResultWriterAck& data) {
+ Record.CopyFrom(data);
+ }
+
+ TEvMessageProcessed::TEvMessageProcessed(const TString& messageId) : MessageId(messageId) {
+ }
}
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index 7fb39fd1f5..1004fd44d7 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -23,6 +23,7 @@ namespace NYql::NDqs {
TEvDqFailure() = default;
explicit TEvDqFailure(const TIssues& issues, bool retriable = false, bool needFallback = false);
explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false);
+ TEvDqFailure(const TString& error, TIssueCode, bool retriable = false, bool needFallback = false);
};
struct TEvQueryResponse
@@ -76,12 +77,14 @@ namespace NYql::NDqs {
TEvPingResponse() = default;
};
+ // legacy
struct TEvFullResultWriterStatusRequest
: NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest,
TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> {
TEvFullResultWriterStatusRequest() = default;
};
+ // legacy
struct TEvFullResultWriterStatusResponse
: NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse,
TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> {
@@ -93,5 +96,25 @@ namespace NYql::NDqs {
DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, "");
};
- using TEvResultReceiverFinish = TEvGraphFinished;
+ struct TEvFullResultWriterWriteRequest
+ : NActors::TEventPB<TEvFullResultWriterWriteRequest, NYql::NDqProto::TFullResultWriterWriteRequest,
+ TDqDataEvents::ES_FULL_RESULT_WRITER_WRITE_REQUEST> {
+ TEvFullResultWriterWriteRequest() = default;
+ explicit TEvFullResultWriterWriteRequest(NDqProto::TFullResultWriterWriteRequest&& data);
+ };
+
+ struct TEvFullResultWriterAck
+ : NActors::TEventPB<TEvFullResultWriterAck, NYql::NDqProto::TFullResultWriterAck,
+ TDqDataEvents::ES_FULL_RESULT_WRITER_ACK> {
+ TEvFullResultWriterAck() = default;
+ explicit TEvFullResultWriterAck(NDqProto::TFullResultWriterAck& data);
+ };
+
+ struct TEvMessageProcessed : NActors::TEventBase<TEvMessageProcessed, TDqDataEvents::ES_MESSAGE_PROCESSED> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvMessageProcessed, "");
+
+ explicit TEvMessageProcessed(const TString& messageId);
+
+ const TString MessageId;
+ };
}
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index 927ac06043..feb3669ab0 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -41,8 +41,8 @@ public:
private:
STRICT_STFUNC(Handler, {
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
- HFunc(TEvPullDataResponse, OnPullResponse)
- HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest)
+ HFunc(TEvFullResultWriterWriteRequest, OnWriteRequest)
+ HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest) // legacy
})
void PassAway() override {
@@ -71,20 +71,13 @@ private:
Send(AggregatorID, new TEvFullResultWriterStatusResponse(response));
}
- void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) {
+ void OnWriteRequest(TEvFullResultWriterWriteRequest::TPtr& ev, const NActors::TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceID);
- auto& response = ev->Get()->Record;
-
- switch (response.GetResponseType()) {
- case NDqProto::FINISH:
- Finish();
- break;
- case NDqProto::CONTINUE:
- Continue(response);
- break;
- default:
- YQL_ENSURE(false, "Unsupported pull request");
- break;
+ auto& request = ev->Get()->Record;
+ if (request.GetFinish()) {
+ Finish();
+ } else {
+ Continue(request);
}
}
@@ -94,9 +87,7 @@ private:
TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; });
FullResultWriter->Finish();
if (ErrorMessage) {
- TIssue issue(*ErrorMessage);
- issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
- Send(AggregatorID, MakeHolder<TEvDqFailure>(issue, false, false).Release());
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(*ErrorMessage, TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, false, false));
} else {
Send(AggregatorID, MakeHolder<TEvDqFailure>().Release());
}
@@ -111,14 +102,14 @@ private:
Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>());
}
- void Continue(NDqProto::TPullResponse& response) {
+ void Continue(NDqProto::TFullResultWriterWriteRequest& request) {
YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount();
- ui64 respSize = response.ByteSizeLong();
- WriteToFullResultTable(response.GetData());
- BytesReceived += respSize;
+ ui64 reqSize = request.ByteSizeLong();
+ WriteToFullResultTable(request);
+ BytesReceived += reqSize;
}
- void WriteToFullResultTable(const NDqProto::TData& data) {
+ void WriteToFullResultTable(NDqProto::TFullResultWriterWriteRequest& request) {
if (ErrorMessage) {
YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting";
return;
@@ -126,18 +117,23 @@ private:
try {
TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; });
- ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) {
+ ResultBuilder->WriteData(request.GetData(), [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) {
writer->AddRow(value);
return true;
});
+ NDqProto::TFullResultWriterAck ackRecord;
+ ackRecord.SetMessageId(request.GetMessageId());
+ Send(AggregatorID, MakeHolder<TEvFullResultWriterAck>(ackRecord));
} catch (...) {
ErrorMessage = CurrentExceptionMessage();
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(*ErrorMessage, TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, false, false));
}
if (ErrorMessage) {
YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage;
}
}
+
private:
const TString TraceID;
THolder<TProtoBuilder> ResultBuilder;
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index 8db8566e4f..4269c280d7 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -11,6 +11,7 @@
#include <util/stream/holder.h>
#include <util/stream/length.h>
+#include <util/generic/queue.h>
namespace NYql::NDqs::NExecutionHelpers {
@@ -28,13 +29,15 @@ namespace NYql::NDqs::NExecutionHelpers {
NActors::TActorId graphExecutionEventsId,
bool discard)
: TBase(&TDerived::Handler)
- , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false))
, ExecuterID(executerId)
- , GraphExecutionEventsId(graphExecutionEventsId)
- , Discard(discard)
- , DataParts()
, TraceId(traceId)
, Settings(settings)
+ , FinishCalled(false)
+ , EarlyFinish(false)
+ , FullResultTableEnabled(settings->EnableFullResultWrite.Get().GetOrElse(false))
+ , GraphExecutionEventsId(graphExecutionEventsId)
+ , Discard(discard)
+ , WriteQueue()
, SizeLimit(
(Settings && Settings->_AllResultsBytesLimit.Get().Defined())
? Settings->_AllResultsBytesLimit.Get().GetRef()
@@ -43,57 +46,43 @@ namespace NYql::NDqs::NExecutionHelpers {
, Rows(0)
, Truncated(false)
, FullResultWriterID()
- , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns))
+ , ResultBuilder(resultType ? MakeHolder<TProtoBuilder>(resultType, columns) : nullptr)
, ResultYson()
, ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson)))
, ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true))
- , FullResultSentBytes(0)
- , FullResultReceivedBytes(0)
- , FullResultSentDataParts(0)
, Issues()
- , FinishCalled(false)
, BlockingActors()
- , QueryResponse() {
+ , QueryResponse()
+ , WaitingAckFromFRW(false) {
ResultYsonWriter->OnBeginList();
YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit;
YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing");
}
- void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
- YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
-
- BlockingActors.clear();
- if (FullResultWriterID) {
- BlockingActors.insert(FullResultWriterID);
- TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>());
- }
-
- YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
-
- QueryResponse.Reset(ev->Release().Release());
- TBase::Become(&TResultActorBase::ShutdownHandler);
- TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>());
+ virtual void FinishFullResultWriter() {
+ TBase::Send(FullResultWriterID, MakeHolder<NActors::TEvents::TEvPoison>());
}
- void OnReceiveData(NYql::NDqProto::TData&& data) {
+ void OnReceiveData(NYql::NDqProto::TData&& data, const TString& messageId = "", bool autoAck = false) {
YQL_LOG_CTX_SCOPE(TraceId);
- if (Discard) {
+ if (data.GetRows() > 0 && !ResultBuilder) {
+ Issues.AddIssue(TIssue("Non empty rows: >=" + ToString(data.GetRows())).SetCode(0, TSeverityIds::S_WARNING));
+ }
+ if (Discard || !ResultBuilder || autoAck) {
+ TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(messageId));
return;
}
+ WriteQueue.emplace(std::move(data), messageId);
if (FullResultTableEnabled && FullResultWriterID) {
- WriteToFullResultTable(MakeHolder<NDqProto::TData>(data));
+ TryWriteToFullResultTable();
} else {
- DataParts.emplace_back(data); // todo: seems like DataParts stores at most 1 element (replace with holder)
-
bool full = true;
bool exceedRows = false;
try {
TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; });
- full = ResultBuilder->WriteYsonData(DataParts.back(), [this, &exceedRows](const TString& rawYson) {
+ full = ResultBuilder->WriteYsonData(WriteQueue.back().WriteRequest.GetData(), [this, &exceedRows](const TString& rawYson) {
if (RowsLimit && Rows + 1 > *RowsLimit) {
exceedRows = true;
return false;
@@ -111,6 +100,8 @@ namespace NYql::NDqs::NExecutionHelpers {
}
if (full) {
+ WriteQueue.back().SentProcessedEvent = true;
+ TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(messageId));
return;
}
@@ -127,28 +118,12 @@ namespace NYql::NDqs::NExecutionHelpers {
TIssue issue(issueMsg);
issue.Severity = TSeverityIds::S_WARNING;
Issues.AddIssues({issue});
+ EarlyFinish = true;
Finish();
}
}
}
- void OnFullResultWriterShutdown() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Got TEvGone";
-
- FullResultWriterID = {};
- }
-
- void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
- if (ev->Get()->Record.IssuesSize() == 0) {
- DoFinish();
- } else {
- TBase::Send(ExecuterID, ev->Release().Release());
- }
- }
-
void OnError(const TString& message, bool retriable, bool needFallback) {
YQL_LOG(ERROR) << "OnError " << message;
auto issueCode = needFallback
@@ -167,29 +142,38 @@ namespace NYql::NDqs::NExecutionHelpers {
FinishCalled = true;
if (FullResultWriterID) {
- NDqProto::TPullResponse response;
- response.SetResponseType(NDqProto::EPullResponseType::FINISH);
- TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
+ NDqProto::TFullResultWriterWriteRequest requestRecord;
+ requestRecord.SetFinish(true);
+ TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(requestRecord)));
} else {
DoFinish();
}
}
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
- TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId())
- + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
- + "." + ToString(ev->Get()->SourceType & 0xFFFF);
- OnError(message, true, true);
+ protected:
+ STFUNC(HandlerBase) {
+ Y_UNUSED(ctx);
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered);
+ HFunc(TEvQueryResponse, OnQueryResult);
+ HFunc(TEvFullResultWriterAck, OnFullResultWriterAck);
+ HFunc(TEvDqFailure, OnFullResultWriterResponse);
+ cFunc(NActors::TEvents::TEvGone::EventType, OnFullResultWriterShutdown);
+ cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway)
+ default:
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << "Unexpected event " << etype;
+ break;
+ }
}
- private:
- STFUNC(ShutdownHandler) {
+ STFUNC(ShutdownHandlerBase) {
Y_UNUSED(ctx);
switch (const ui32 etype = ev->GetTypeRewrite()) {
HFunc(NActors::TEvents::TEvGone, OnShutdownQueryResult);
cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway);
HFunc(TEvDqFailure, OnErrorInShutdownState);
+ HFunc(TEvFullResultWriterAck, OnFullResultWriterAck);
default:
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << "Unexpected event " << etype;
@@ -198,6 +182,66 @@ namespace NYql::NDqs::NExecutionHelpers {
}
private:
+ void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
+ YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
+
+ BlockingActors.clear();
+ if (FullResultWriterID) {
+ BlockingActors.insert(FullResultWriterID);
+ FinishFullResultWriter();
+ }
+
+ YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
+
+ QueryResponse.Reset(ev->Release().Release());
+ TBase::Become(&TDerived::ShutdownHandler);
+ TBase::Send(TBase::SelfId(), MakeHolder<NActors::TEvents::TEvGone>());
+ }
+
+ void OnFullResultWriterShutdown() {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << "Got TEvGone";
+
+ FullResultWriterID = {};
+ }
+
+ void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ if (ev->Get()->Record.IssuesSize() == 0) {
+ DoFinish();
+ } else {
+ TBase::Send(ExecuterID, ev->Release().Release());
+ }
+ }
+
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId())
+ + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
+ + "." + ToString(ev->Get()->SourceType & 0xFFFF);
+ OnError(message, true, true);
+ }
+
+ void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ Y_VERIFY(ev->Get()->Record.GetMessageId() == WriteQueue.front().MessageId);
+ if (!WriteQueue.front().SentProcessedEvent) { // messages, received before limits exceeded, are already been reported
+ TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(WriteQueue.front().MessageId));
+ }
+ WriteQueue.pop();
+
+ if (WriteQueue.empty()) {
+ WaitingAckFromFRW = false;
+ return;
+ }
+
+ UnsafeWriteToFullResultTable();
+ }
+
void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
// FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse
TIssues issues;
@@ -244,7 +288,7 @@ namespace NYql::NDqs::NExecutionHelpers {
NActorsProto::TActorId fullResultWriterProto;
ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto);
FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto);
- WriteAllDataPartsToFullResultTable();
+ TryWriteToFullResultTable();
}
});
}
@@ -277,75 +321,47 @@ namespace NYql::NDqs::NExecutionHelpers {
YQL_LOG(DEBUG) << __FUNCTION__;
}
- bool CanSendToFullResultWriter() {
- // TODO Customize
- return FullResultSentBytes - FullResultReceivedBytes <= 32_MB;
- }
-
- template <class TCallback>
- void UpdateEventQueueStatus(TCallback callback) {
- YQL_LOG(DEBUG) << "UpdateEQStatus before: sent " << (FullResultSentBytes / 1024.0) << " kB "
- << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
- << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
- TBase::Send(FullResultWriterID, new TEvFullResultWriterStatusRequest());
- TBase::template Synchronize<TEvFullResultWriterStatusResponse>([this, callback](TEvFullResultWriterStatusResponse::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
- this->FullResultReceivedBytes = ev->Get()->Record.GetBytesReceived();
- YQL_LOG(DEBUG) << "UpdateEQStatus after: sent " << (FullResultSentBytes / 1024.0) << " kB "
- << " received " << (FullResultReceivedBytes / 1024.0) << " kB "
- << " diff " << (FullResultSentBytes - FullResultReceivedBytes) / 1024.0 << " kB";
- if (ev->Get()->Record.HasErrorMessage()) {
- YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage();
- OnError(ev->Get()->Record.GetErrorMessage(), false, false);
- return;
- }
- callback();
- });
- }
-
- void WriteAllDataPartsToFullResultTable() {
- while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) {
- UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]);
- DataParts[FullResultSentDataParts].Clear();
- ++FullResultSentDataParts;
- }
- if (FullResultSentDataParts == DataParts.size()) {
+ void TryWriteToFullResultTable() {
+ if (WaitingAckFromFRW) {
return;
}
- // here we cannot continue since the event queue is overloaded
- // kind of tail recursion (but without recursion)
- UpdateEventQueueStatus([this]() {
- WriteAllDataPartsToFullResultTable();
- });
+ WaitingAckFromFRW = true;
+ UnsafeWriteToFullResultTable();
}
- void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) {
- if (CanSendToFullResultWriter()) {
- UnsafeWriteToFullResultTable(*data);
- return;
- }
- UpdateEventQueueStatus([this, data]() {
- WriteToFullResultTable(data);
- });
+ void UnsafeWriteToFullResultTable() {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(WriteQueue.front().WriteRequest)));
}
- void UnsafeWriteToFullResultTable(const NDqProto::TData& data) {
- NDqProto::TPullResponse response;
- response.SetResponseType(NDqProto::EPullResponseType::CONTINUE);
- response.MutableData()->CopyFrom(data);
- ui64 respSize = response.ByteSizeLong();
- TBase::Send(FullResultWriterID, MakeHolder<TEvPullDataResponse>(response));
- FullResultSentBytes += respSize;
- }
+ private:
+ struct TQueueItem {
+ TQueueItem(NDqProto::TData&& data, const TString& messageId)
+ : WriteRequest()
+ , MessageId(messageId)
+ , SentProcessedEvent(false) {
+ *WriteRequest.MutableData() = std::move(data);
+ WriteRequest.SetMessageId(messageId);
+ }
+
+ NDqProto::TFullResultWriterWriteRequest WriteRequest;
+ const TString MessageId;
+ bool SentProcessedEvent;
+ };
protected:
- const bool FullResultTableEnabled;
const NActors::TActorId ExecuterID;
- const NActors::TActorId GraphExecutionEventsId;
- const bool Discard;
- TVector<NDqProto::TData> DataParts;
const TString TraceId;
TDqConfiguration::TPtr Settings;
+ bool FinishCalled;
+ bool EarlyFinish;
+
+ private:
+ const bool FullResultTableEnabled;
+ const NActors::TActorId GraphExecutionEventsId;
+ const bool Discard;
+ TQueue<TQueueItem> WriteQueue;
ui64 SizeLimit;
TMaybe<ui64> RowsLimit;
ui64 Rows;
@@ -355,15 +371,9 @@ namespace NYql::NDqs::NExecutionHelpers {
TString ResultYson;
THolder<TCountingOutput> ResultYsonOut;
THolder<NYson::TYsonWriter> ResultYsonWriter;
-
- ui64 FullResultSentBytes;
- ui64 FullResultReceivedBytes;
- ui64 FullResultSentDataParts;
-
TIssues Issues;
- bool FinishCalled;
-
THashSet<NActors::TActorId> BlockingActors;
THolder<TEvQueryResponse> QueryResponse;
+ bool WaitingAckFromFRW;
};
} // namespace NYql::NDqs::NExecutionHelpers
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index 44ab1451d9..538f0c9b65 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -27,6 +27,7 @@
#include <library/cpp/yson/writer.h>
#include <util/generic/ptr.h>
+#include <util/generic/guid.h>
#include <util/string/split.h>
#include <util/system/types.h>
#include <util/stream/str.h>
@@ -50,6 +51,7 @@ using namespace NActors;
namespace {
class TResultAggregator: public TResultActorBase<TResultAggregator> {
+ using TBase = TResultActorBase<TResultAggregator>;
static constexpr ui32 MAX_RESULT_BATCH = 2048;
public:
@@ -57,7 +59,8 @@ public:
explicit TResultAggregator(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId,
const TDqConfiguration::TPtr& settings, const TString& resultType, NActors::TActorId graphExecutionEventsId, bool discard)
- : TResultActorBase<TResultAggregator>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) {
+ : TBase(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard)
+ , Continue(false) {
if (Settings) {
PullRequestTimeout = TDuration::MilliSeconds(settings->PullRequestTimeoutMs.Get().GetOrElse(0));
PingTimeout = TDuration::MilliSeconds(settings->PingTimeoutMs.Get().GetOrElse(0));
@@ -66,26 +69,37 @@ public:
}
public:
-#define HANDLER_STUB(TEvType) \
- cFunc(TEvType::EventType, [this]() { \
- YQL_LOG_CTX_SCOPE(TraceId); \
- YQL_LOG(DEBUG) << "Unexpected event " << ( #TEvType ); \
- })
-
- STRICT_STFUNC(Handler, {
- HFunc(TEvPullResult, OnPullResult);
- HFunc(TEvReadyState, OnReadyState);
- HFunc(TEvPullDataResponse, OnPullResponse);
- HFunc(TEvPingResponse, OnPingResponse);
- HFunc(TEvQueryResponse, OnQueryResult);
- HFunc(TEvDqFailure, OnFullResultWriterResponse);
- cFunc(TEvents::TEvPoison::EventType, PassAway)
- hFunc(TEvents::TEvUndelivered, OnUndelivered)
- cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
- cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown)
- })
+ STFUNC(Handler) {
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ HFunc(TEvPullDataResponse, OnPullResponse);
+ cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
+ sFunc(TEvMessageProcessed, OnMessageProcessed)
+ HFunc(TEvPullResult, OnPullResult);
+ HFunc(TEvReadyState, OnReadyState);
+ HFunc(TEvPingResponse, OnPingResponse);
+ default:
+ TBase::HandlerBase(ev, ctx);
+ }
+ }
+
+ STFUNC(ShutdownHandler) {
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ sFunc(TEvMessageProcessed, OnMessageProcessed);
+ default:
+ TBase::ShutdownHandlerBase(ev, ctx);
+ }
+ }
private:
+ void OnMessageProcessed() {
+ if (!Continue) {
+ return;
+ }
+
+ Continue = false;
+ Send(SelfId(), MakeHolder<TEvPullResult>());
+ }
+
void OnWakeup() {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << __FUNCTION__;
@@ -148,7 +162,7 @@ private:
switch (response.GetResponseType()) {
case NYql::NDqProto::CONTINUE: {
- Send(SelfId(), MakeHolder<TEvPullResult>());
+ Continue = true;
} break;
case NYql::NDqProto::FINISH:
Finish();
@@ -169,9 +183,11 @@ private:
break;
}
- OnReceiveData(std::move(*response.MutableData()));
+ // guid here is redundant and serves only for logic validation
+ OnReceiveData(std::move(*response.MutableData()), TGUID::Create().AsGuidString());
}
+private:
NActors::TActorId SourceID;
TDuration PullRequestTimeout;
TDuration PingTimeout;
@@ -180,6 +196,7 @@ private:
TInstant PullRequestStartTime;
bool PingRequested = false;
NActors::TSchedulerCookieHolder TimerCookieHolder;
+ bool Continue;
};
class TResultPrinter: public TActor<TResultPrinter> {
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
index 2e0a7bfbdd..7c743e7f60 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
@@ -34,51 +34,91 @@ namespace {
class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TResultReceiver> {
public:
+ using TBase = TResultActorBase<TResultReceiver>;
+
static constexpr char ActorName[] = "YQL_DQ_RESULT_RECEIVER";
explicit TResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings,
const THashMap<TString, TString>& /*secureParams*/, const TString& resultType, const NActors::TActorId& graphExecutionEventsId, bool discard)
- : TResultActorBase<TResultReceiver>(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard) {
+ : TBase(columns, executerId, traceId, settings, resultType, graphExecutionEventsId, discard)
+ , PendingMessages() {
}
public:
- STRICT_STFUNC(Handler, {
- HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData)
- HFunc(TEvReadyState, OnReadyState);
- HFunc(TEvQueryResponse, OnQueryResult);
- cFunc(TEvents::TEvPoison::EventType, PassAway);
- HFunc(TEvDqFailure, OnFullResultWriterResponse);
- hFunc(TEvents::TEvUndelivered, OnUndelivered);
- cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown);
- sFunc(TEvResultReceiverFinish, Finish);
- })
+ STFUNC(Handler) {
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData);
+ HFunc(TEvReadyState, OnReadyState);
+ hFunc(TEvMessageProcessed, OnMessageProcessed);
+ default:
+ TBase::HandlerBase(ev, ctx);
+ }
+ }
+
+ STFUNC(ShutdownHandler) {
+ switch (const ui32 etype = ev->GetTypeRewrite()) {
+ hFunc(TEvMessageProcessed, OnMessageProcessed);
+ default:
+ TBase::ShutdownHandlerBase(ev, ctx);
+ }
+ }
private:
void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << __FUNCTION__;
+ bool finishRequested = ev->Get()->Record.GetChannelData().GetFinished();
if (!FinishCalled) {
- OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData()));
+ const auto messageId = GetMessageId(ev);
+ const auto hasData = ev->Get()->Record.GetChannelData().HasData();
+ OnReceiveData(std::move(*ev->Get()->Record.MutableChannelData()->MutableData()), messageId, !hasData);
+ const auto [it, inserted] = PendingMessages.insert({messageId, std::move(ev)});
+ Y_ENSURE(inserted);
}
- if (!FinishCalled && ev->Get()->Record.GetChannelData().GetFinished()) {
- Send(SelfId(), MakeHolder<TEvResultReceiverFinish>()); // postpone finish until TFullResultWriterActor is instaniated
- }
-
- // todo: ack after data is stored to yt?
- auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
- res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId());
- res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
- res->Record.SetFreeSpace(256_MB);
- res->Record.SetFinish(FinishCalled); // set if premature finish started (when response limit reached and FullResultTable not enabled)
- Send(ev->Sender, res.Release());
- YQL_LOG(DEBUG) << "Finished: " << ev->Get()->Record.GetChannelData().GetFinished();
+ YQL_LOG(DEBUG) << "Finished: " << finishRequested;
}
void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) {
// do nothing
}
+
+ void OnMessageProcessed(TEvMessageProcessed::TPtr& ev) {
+ YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG(DEBUG) << __FUNCTION__;
+ SendAck(ev->Get()->MessageId);
+ }
+
+ void SendAck(const TString& messageId) {
+ const auto messageIt = PendingMessages.find(messageId);
+ Y_VERIFY(messageIt != PendingMessages.end());
+ const auto& message = messageIt->second;
+
+ auto req = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
+ req->Record.SetChannelId(message->Get()->Record.GetChannelData().GetChannelId());
+ req->Record.SetSeqNo(message->Get()->Record.GetSeqNo());
+ req->Record.SetFreeSpace(256_MB);
+ req->Record.SetFinish(EarlyFinish); // set if premature finish started (when response limit reached and FullResultTable not enabled)
+
+ Send(message->Sender, req.Release());
+ PendingMessages.erase(messageIt);
+ }
+
+ TString GetMessageId(const NDq::TEvDqCompute::TEvChannelData::TPtr& message) const {
+ TStringBuilder res;
+ res << message->Get()->Record.GetChannelData().GetChannelId()
+ << " " << message->Get()->Record.GetSeqNo()
+ << " " << ToString(message->Sender);
+ return res;
+ }
+
+ void FinishFullResultWriter() override {
+ Finish();
+ }
+
+private:
+ THashMap<TString, NDq::TEvDqCompute::TEvChannelData::TPtr> PendingMessages; // N.B. TEvChannelData is partially moved
};
} /* namespace */
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 09aecfcf92..309f1ab73d 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -232,14 +232,26 @@ message TGraphExecutionEvent {
}
}
+// legacy
message TFullResultWriterStatusRequest {
}
+// legacy
message TFullResultWriterStatusResponse {
uint64 BytesReceived = 1;
optional string ErrorMessage = 2;
}
+message TFullResultWriterWriteRequest {
+ TData Data = 1;
+ bool Finish = 2;
+ bytes MessageId = 3;
+}
+
+message TFullResultWriterAck {
+ bytes MessageId = 1;
+}
+
message TDqTaskPrepareResult {
bool Result = 1;
repeated TMetric Metric = 2;