aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@yandex-team.com>2023-07-25 11:36:46 +0300
committerroot <root@qavm-2ed34686.qemu>2023-07-25 11:36:46 +0300
commit0b882c94a79631eb829db45cdb1c60612fb90fd1 (patch)
tree760066c61512303c59f1f510555faff32f5937c3
parenta8d09bf9e0b3e44c19dbfcc49e7f4d558513897b (diff)
downloadydb-0b882c94a79631eb829db45cdb1c60612fb90fd1.tar.gz
YQ Connector: support rough limitation on number of lines that can be read from the data source per request
Пока на стороне YQ не реализован стримовый С++ клиент к Коннектору с backpressure, защищаемся от возможного OOM в YQ с помощью ограничения на количество строк, извлечённых из источника данных в течение 1 запроса. Для этого добавлена новая настройка в конфиг сервиса Коннектор. Эта функциональность станет не нужна после https://st.yandex-team.ru/.
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp25
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp4
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.cpp19
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.h5
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ya.make1
9 files changed, 44 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index 7ab3cefb418..e7c81286227 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -45,11 +45,11 @@ namespace NYql::NDq {
};
struct TEvReadError: public TEventLocal<TEvReadError, EvReadError> {
- TEvReadError(TIssues&& error)
- : Error(std::move(error))
+ TEvReadError(const NYql::NConnector::NApi::TError& error)
+ : Error(error)
{
}
- TIssues Error;
+ NYql::NConnector::NApi::TError Error;
};
};
@@ -79,13 +79,13 @@ namespace NYql::NDq {
auto listSplitsResult = ConnectorClient_->ListSplits(listSplitsRequest);
if (!NConnector::ErrorIsSuccess(listSplitsResult->Error)) {
- YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure" << listSplitsResult->Error.DebugString();
+ YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure: " << listSplitsResult->Error.DebugString();
ActorSystem_->Send(new IEventHandle(
- SelfId(), TActorId(), new TEvPrivate::TEvReadError(NConnector::ErrorToIssues(listSplitsResult->Error))));
+ SelfId(), TActorId(), new TEvPrivate::TEvReadError(listSplitsResult->Error)));
return;
}
- YQL_CLOG(INFO, ProviderGeneric) << "ListSplits succeess, total splits: " << listSplitsResult->Splits.size();
+ YQL_CLOG(INFO, ProviderGeneric) << "ListSplits success, total splits: " << listSplitsResult->Splits.size();
NConnector::NApi::TReadSplitsRequest readSplitsRequest;
readSplitsRequest.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
@@ -96,14 +96,14 @@ namespace NYql::NDq {
readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_);
auto readSplitsResult = ConnectorClient_->ReadSplits(readSplitsRequest);
- if (!NConnector::ErrorIsSuccess(listSplitsResult->Error)) {
- YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure" << readSplitsResult->Error.DebugString();
+ if (!NConnector::ErrorIsSuccess(readSplitsResult->Error)) {
+ YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure: " << readSplitsResult->Error.DebugString();
ActorSystem_->Send(new IEventHandle(
- SelfId(), TActorId(), new TEvPrivate::TEvReadError(NConnector::ErrorToIssues(listSplitsResult->Error))));
+ SelfId(), TActorId(), new TEvPrivate::TEvReadError(readSplitsResult->Error)));
return;
}
- YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits succeess, total batches: "
+ YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits success, total batches: "
<< readSplitsResult->RecordBatches.size();
ActorSystem_->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult)));
@@ -186,7 +186,10 @@ namespace NYql::NDq {
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
Send(ComputeActorId_,
- new TEvAsyncInputError(InputIndex_, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ new TEvAsyncInputError(
+ InputIndex_,
+ NConnector::ErrorToIssues(result->Get()->Error),
+ NConnector::ErrorToDqStatus(result->Get()->Error)));
}
// IActor & IDqComputeActorAsyncInput
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
index d728b6ed10c..03898c1e9af 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC
contrib-libs-grpc
formats-arrow-serializer
library-yql-ast
+ dq-actors-protos
providers-common-proto
connector-api-common
connector-api-service
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
index 3f1f81448ef..1c37c3f1af2 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC
contrib-libs-grpc
formats-arrow-serializer
library-yql-ast
+ dq-actors-protos
providers-common-proto
connector-api-common
connector-api-service
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
index 3f1f81448ef..1c37c3f1af2 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC
contrib-libs-grpc
formats-arrow-serializer
library-yql-ast
+ dq-actors-protos
providers-common-proto
connector-api-common
connector-api-service
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
index d728b6ed10c..03898c1e9af 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC
contrib-libs-grpc
formats-arrow-serializer
library-yql-ast
+ dq-actors-protos
providers-common-proto
connector-api-common
connector-api-service
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp
index 487b997f0ea..c1d69f5552e 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp
@@ -91,6 +91,10 @@ namespace NYql::NConnector {
// preserve server error
out->Error = response.error();
+ if (!ErrorIsSuccess(out->Error)) {
+ break;
+ }
+
// convert our own columnar format into arrow batch
out->RecordBatches.push_back(APIReadSplitsResponseToArrowRecordBatch(response));
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
index 448b90a4d14..f72b09a1f41 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
@@ -30,11 +30,28 @@ namespace NYql::NConnector {
issues.AddIssue(TIssue(error.message()));
// convert detailed errors
- IssuesFromMessage(error.get_arr_issues(), issues);
+ for (auto& subIssue : error.get_arr_issues()) {
+ issues.AddIssue(IssueFromMessage(subIssue));
+ }
return issues;
}
+ NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error) {
+ switch (error.status()) {
+ case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
+ return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST;
+ case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR:
+ return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR;
+ case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_UNSUPPORTED:
+ return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSUPPORTED;
+ case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_NOT_FOUND:
+ return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST;
+ default:
+ ythrow yexception() << "Unexpected YDB status code: " << ::Ydb::StatusIds::StatusCode_Name(error.status());
+ }
+ }
+
void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) {
YQL_ENSURE(!ErrorIsSuccess(error));
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.h b/ydb/library/yql/providers/generic/connector/libcpp/error.h
index 5a3e745177a..6ffb105311c 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.h
@@ -1,15 +1,16 @@
#pragma once
#include <grpcpp/support/status.h>
+
#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
namespace NYql::NConnector {
-
bool ErrorIsUninitialized(const NApi::TError& error) noexcept;
bool ErrorIsSuccess(const NApi::TError& error);
TIssues ErrorToIssues(const NApi::TError& error);
+ NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error);
void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary);
NApi::TError ErrorFromGRPCStatus(const grpc::Status& status);
-
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
index db966672fda..015ee2204c7 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
@@ -12,6 +12,7 @@ PEERDIR(
contrib/libs/grpc
ydb/core/formats/arrow/serializer
ydb/library/yql/ast
+ ydb/library/yql/dq/actors/protos
ydb/library/yql/providers/common/proto
ydb/library/yql/providers/generic/connector/api/common
ydb/library/yql/providers/generic/connector/api/service