diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-07-25 11:36:46 +0300 |
---|---|---|
committer | root <root@qavm-2ed34686.qemu> | 2023-07-25 11:36:46 +0300 |
commit | 0b882c94a79631eb829db45cdb1c60612fb90fd1 (patch) | |
tree | 760066c61512303c59f1f510555faff32f5937c3 | |
parent | a8d09bf9e0b3e44c19dbfcc49e7f4d558513897b (diff) | |
download | ydb-0b882c94a79631eb829db45cdb1c60612fb90fd1.tar.gz |
YQ Connector: support rough limitation on number of lines that can be read from the data source per request
Пока на стороне YQ не реализован стримовый С++ клиент к Коннектору с backpressure, защищаемся от возможного OOM в YQ с помощью ограничения на количество строк, извлечённых из источника данных в течение 1 запроса. Для этого добавлена новая настройка в конфиг сервиса Коннектор.
Эта функциональность станет не нужна после https://st.yandex-team.ru/.
9 files changed, 44 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 7ab3cefb418..e7c81286227 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -45,11 +45,11 @@ namespace NYql::NDq { }; struct TEvReadError: public TEventLocal<TEvReadError, EvReadError> { - TEvReadError(TIssues&& error) - : Error(std::move(error)) + TEvReadError(const NYql::NConnector::NApi::TError& error) + : Error(error) { } - TIssues Error; + NYql::NConnector::NApi::TError Error; }; }; @@ -79,13 +79,13 @@ namespace NYql::NDq { auto listSplitsResult = ConnectorClient_->ListSplits(listSplitsRequest); if (!NConnector::ErrorIsSuccess(listSplitsResult->Error)) { - YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure" << listSplitsResult->Error.DebugString(); + YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure: " << listSplitsResult->Error.DebugString(); ActorSystem_->Send(new IEventHandle( - SelfId(), TActorId(), new TEvPrivate::TEvReadError(NConnector::ErrorToIssues(listSplitsResult->Error)))); + SelfId(), TActorId(), new TEvPrivate::TEvReadError(listSplitsResult->Error))); return; } - YQL_CLOG(INFO, ProviderGeneric) << "ListSplits succeess, total splits: " << listSplitsResult->Splits.size(); + YQL_CLOG(INFO, ProviderGeneric) << "ListSplits success, total splits: " << listSplitsResult->Splits.size(); NConnector::NApi::TReadSplitsRequest readSplitsRequest; readSplitsRequest.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING); @@ -96,14 +96,14 @@ namespace NYql::NDq { readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_); auto readSplitsResult = ConnectorClient_->ReadSplits(readSplitsRequest); - if (!NConnector::ErrorIsSuccess(listSplitsResult->Error)) { - YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure" << readSplitsResult->Error.DebugString(); + if (!NConnector::ErrorIsSuccess(readSplitsResult->Error)) { + YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure: " << readSplitsResult->Error.DebugString(); ActorSystem_->Send(new IEventHandle( - SelfId(), TActorId(), new TEvPrivate::TEvReadError(NConnector::ErrorToIssues(listSplitsResult->Error)))); + SelfId(), TActorId(), new TEvPrivate::TEvReadError(readSplitsResult->Error))); return; } - YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits succeess, total batches: " + YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits success, total batches: " << readSplitsResult->RecordBatches.size(); ActorSystem_->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult))); @@ -186,7 +186,10 @@ namespace NYql::NDq { void Handle(TEvPrivate::TEvReadError::TPtr& result) { Send(ComputeActorId_, - new TEvAsyncInputError(InputIndex_, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + new TEvAsyncInputError( + InputIndex_, + NConnector::ErrorToIssues(result->Get()->Error), + NConnector::ErrorToDqStatus(result->Get()->Error))); } // IActor & IDqComputeActorAsyncInput diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt index d728b6ed10c..03898c1e9af 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC contrib-libs-grpc formats-arrow-serializer library-yql-ast + dq-actors-protos providers-common-proto connector-api-common connector-api-service diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt index 3f1f81448ef..1c37c3f1af2 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC contrib-libs-grpc formats-arrow-serializer library-yql-ast + dq-actors-protos providers-common-proto connector-api-common connector-api-service diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt index 3f1f81448ef..1c37c3f1af2 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC contrib-libs-grpc formats-arrow-serializer library-yql-ast + dq-actors-protos providers-common-proto connector-api-common connector-api-service diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt index d728b6ed10c..03898c1e9af 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(generic-connector-libcpp PUBLIC contrib-libs-grpc formats-arrow-serializer library-yql-ast + dq-actors-protos providers-common-proto connector-api-common connector-api-service diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp index 487b997f0ea..c1d69f5552e 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client_grpc.cpp @@ -91,6 +91,10 @@ namespace NYql::NConnector { // preserve server error out->Error = response.error(); + if (!ErrorIsSuccess(out->Error)) { + break; + } + // convert our own columnar format into arrow batch out->RecordBatches.push_back(APIReadSplitsResponseToArrowRecordBatch(response)); } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp index 448b90a4d14..f72b09a1f41 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp @@ -30,11 +30,28 @@ namespace NYql::NConnector { issues.AddIssue(TIssue(error.message())); // convert detailed errors - IssuesFromMessage(error.get_arr_issues(), issues); + for (auto& subIssue : error.get_arr_issues()) { + issues.AddIssue(IssueFromMessage(subIssue)); + } return issues; } + NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error) { + switch (error.status()) { + case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: + return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST; + case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR: + return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR; + case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_UNSUPPORTED: + return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSUPPORTED; + case ::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_NOT_FOUND: + return NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST; + default: + ythrow yexception() << "Unexpected YDB status code: " << ::Ydb::StatusIds::StatusCode_Name(error.status()); + } + } + void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) { YQL_ENSURE(!ErrorIsSuccess(error)); diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.h b/ydb/library/yql/providers/generic/connector/libcpp/error.h index 5a3e745177a..6ffb105311c 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/error.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/error.h @@ -1,15 +1,16 @@ #pragma once #include <grpcpp/support/status.h> + #include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> #include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> namespace NYql::NConnector { - bool ErrorIsUninitialized(const NApi::TError& error) noexcept; bool ErrorIsSuccess(const NApi::TError& error); TIssues ErrorToIssues(const NApi::TError& error); + NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error); void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary); NApi::TError ErrorFromGRPCStatus(const grpc::Status& status); - } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make index db966672fda..015ee2204c7 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make +++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make @@ -12,6 +12,7 @@ PEERDIR( contrib/libs/grpc ydb/core/formats/arrow/serializer ydb/library/yql/ast + ydb/library/yql/dq/actors/protos ydb/library/yql/providers/common/proto ydb/library/yql/providers/generic/connector/api/common ydb/library/yql/providers/generic/connector/api/service |