diff options
| author | Vitaly Isaev <[email protected]> | 2025-02-07 16:43:04 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-02-07 16:43:04 +0300 |
| commit | d9d9787dffb47f48d6383db99f99d0b4d3a59c98 (patch) | |
| tree | b07c2bfbdbdcdb2a15dbe2a29cb5f1187f1cf4e8 | |
| parent | faa4814de5de2578f0a81308477649fa2c1283cc (diff) | |
YDB FQ: simplify `yql_generic_load_meta.cpp` code (#14307)
4 files changed, 110 insertions, 101 deletions
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp index 7646562cf60..2ce7d8b5df0 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp @@ -12,12 +12,12 @@ namespace NYql::NConnector { return error; } - TIssues ErrorToIssues(const NApi::TError& error) { + TIssues ErrorToIssues(const NApi::TError& error, TString prefix) { TIssues issues; issues.Reserve(error.get_arr_issues().size() + 1); // add high-level error - issues.AddIssue(TIssue(error.message())); + issues.AddIssue(TIssue(TStringBuilder() << prefix << error.message())); // convert detailed errors for (auto& subIssue : error.get_arr_issues()) { @@ -44,20 +44,6 @@ namespace NYql::NConnector { } } - void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) { - // add high-level error - TStringBuilder ss; - ss << summary << ": status=" << Ydb::StatusIds_StatusCode_Name(error.status()) << ", message=" << error.message(); - ctx.AddError(TIssue(position, ss)); - - // convert detailed errors - TIssues issues; - IssuesFromMessage(error.get_arr_issues(), issues); - for (const auto& issue : issues) { - ctx.AddError(issue); - } - } - NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status) { NApi::TError result; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.h b/ydb/library/yql/providers/generic/connector/libcpp/error.h index 94218d24e93..99c3772be0b 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/error.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/error.h @@ -2,10 +2,11 @@ #include <grpcpp/support/status.h> -#include <yql/essentials/ast/yql_expr.h> #include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> #include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> #include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h> +#include <yql/essentials/public/issue/yql_issue.h> +#include <yql/essentials/utils/yql_panic.h> namespace NYql::NConnector { NApi::TError NewSuccess(); @@ -29,12 +30,10 @@ namespace NYql::NConnector { return ok; } - TIssues ErrorToIssues(const NApi::TError& error); + TIssues ErrorToIssues(const NApi::TError& error, TString prefix = ""); NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error); - void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary); - NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status); inline bool GrpcStatusEndOfStream(const NYdbGrpc::TGrpcStatus& status) noexcept { diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make index 1a8e4551202..d7b7fd6fdf8 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make +++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make @@ -11,7 +11,6 @@ PEERDIR( contrib/libs/grpc ydb/core/formats/arrow/serializer ydb/public/sdk/cpp/src/library/grpc/client - yql/essentials/ast ydb/library/yql/dq/actors/protos yql/essentials/providers/common/proto yql/essentials/providers/common/proto diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index d243d597e56..1628db86aca 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -34,7 +34,7 @@ namespace NYql { std::optional<NConnector::NApi::TDescribeTableResponse> Response; }; - using TMapType = + using TDescribeTableMap = std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>; public: @@ -50,7 +50,7 @@ namespace NYql { return TStatus::Ok; } - std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables; + std::unordered_set<TDescribeTableMap::key_type, TDescribeTableMap::hasher> pendingTables; const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { if (const auto maybeRead = TMaybeNode<TGenRead>(node)) { return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; @@ -146,110 +146,135 @@ namespace NYql { }); TNodeOnNodeOwnedMap replaces(reads.size()); - bool hasErrors = false; for (const auto& r : reads) { - const TGenRead read(r); - const auto clusterName = read.DataSource().Cluster().StringValue(); - const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head(); - const auto tableName = TString(keyArg.Tail().Head().Content()); + TIssues issues = HandleDescribeTableResponse(r, ctx, replaces); + if (issues) { + for (const auto& issue : issues) { + ctx.AddError(issue); + } - const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName)); - if (Results_.cend() != it) { - const auto& response = it->second->Response; + return TStatus::Error; + } + } - if (NConnector::IsSuccess(*response)) { - TGenericState::TTableMeta tableMeta; - tableMeta.Schema = response->schema(); - tableMeta.DataSourceInstance = it->second->DataSourceInstance; + return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); + } - const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder); + void Rewind() final { + Results_.clear(); + AsyncFuture_ = {}; + } - if (parse) { - tableMeta.ItemType = parse; - if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) { - // clang-format off - auto row = Build<TCoArgument>(ctx, read.Pos()) - .Name("row") - .Done(); + private: + TIssues HandleDescribeTableResponse( + const TIntrusivePtr<TExprNode>& read, + TExprContext& ctx, + TNodeOnNodeOwnedMap& replaces + ) { + const TGenRead genRead(read); + const auto clusterName = genRead.DataSource().Cluster().StringValue(); + const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head(); + const auto tableName = TString(keyArg.Tail().Head().Content()); - auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos()) - .Args({row}) - .Body<TCoBool>() - .Literal().Build("true") - .Build() - .Done().Ptr(); + const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName)); - auto table = Build<TGenTable>(ctx, read.Pos()) - .Name().Value(tableName).Build() - .Splits<TCoVoid>().Build().Done(); + if (it == Results_.cend()) { + TIssues issues; + issues.AddIssue(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder() + << "Not found result for " << clusterName << '.' << tableName)); + return issues; + } - ins.first->second = Build<TGenReadTable>(ctx, read.Pos()) - .World(read.World()) - .DataSource(read.DataSource()) - .Table(table) - .Columns<TCoVoid>().Build() - .FilterPredicate(emptyPredicate) - .Done().Ptr(); - // clang-format on - } - State_->AddTable(clusterName, tableName, std::move(tableMeta)); - } else { - hasErrors = true; - break; - } - } else { - const auto& error = response->error(); - NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()), - TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName); - hasErrors = true; - break; - } - } else { - ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() - << "Not found result for " << clusterName << '.' << tableName)); - hasErrors = true; - break; - } + const auto& response = it->second->Response; + + if (!NConnector::IsSuccess(*response)) { + return NConnector::ErrorToIssues( + response->error(), + TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName + ); } - if (hasErrors) { - return TStatus::Error; + TGenericState::TTableMeta tableMeta; + tableMeta.Schema = response->schema(); + tableMeta.DataSourceInstance = it->second->DataSourceInstance; + + auto issues = ParseTableMeta(ctx, ctx.GetPosition(read->Pos()), clusterName, tableName, tableMeta); + if (issues) { + return issues; } - return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); - } + if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) { + ins.first->second = MakeTableMetaNode(ctx, genRead, tableName); + } - void Rewind() final { - Results_.clear(); - AsyncFuture_ = {}; + State_->AddTable(clusterName, tableName, std::move(tableMeta)); + return TIssues{}; } - private: - const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster, - const std::string_view& table, TExprContext& ctx, TVector<TString>& columnOrder) try { + TIssues ParseTableMeta( + TExprContext& ctx, + const TPosition& pos, + const std::string_view& cluster, + const std::string_view& table, + TGenericState::TTableMeta& tableMeta + ) try { TVector<const TItemExprType*> items; - auto columns = schema.columns(); + const auto& columns = tableMeta.Schema.columns(); if (columns.empty()) { - ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist.")); - return nullptr; + TIssues issues; + issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist.")); + return issues; } - for (auto i = 0; i < columns.size(); i++) { + for (const auto& column: columns) { // Make type annotation - NYdb::TTypeParser parser(columns.Get(i).type()); + NYdb::TTypeParser parser(column.type()); auto typeAnnotation = NFq::MakeType(parser, ctx); // Create items from graph - items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation)); - columnOrder.emplace_back(columns.Get(i).name()); + items.emplace_back(ctx.MakeType<TItemExprType>(column.name(), typeAnnotation)); + tableMeta.ColumnOrder.emplace_back(column.name()); } - // FIXME: handle on Connector's side? - return ctx.MakeType<TStructExprType>(items); + + tableMeta.ItemType = ctx.MakeType<TStructExprType>(items); + return TIssues{}; } catch (std::exception&) { - ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage())); - return nullptr; + TIssues issues; + issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage())); + return issues; + } + + TExprNode::TPtr MakeTableMetaNode( + TExprContext& ctx, + const TGenRead& read, + const TString& tableName + ) { + // clang-format off + auto row = Build<TCoArgument>(ctx, read.Pos()) + .Name("row") + .Done(); + + auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos()) + .Args({row}) + .Body<TCoBool>() + .Literal().Build("true") + .Build() + .Done().Ptr(); + + auto table = Build<TGenTable>(ctx, read.Pos()) + .Name().Value(tableName).Build() + .Splits<TCoVoid>().Build().Done(); + + return Build<TGenReadTable>(ctx, read.Pos()) + .World(read.World()) + .DataSource(read.DataSource()) + .Table(table) + .Columns<TCoVoid>().Build() + .FilterPredicate(emptyPredicate) + .Done().Ptr(); + // clang-format on } void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, @@ -402,7 +427,7 @@ namespace NYql { private: const TGenericState::TPtr State_; - TMapType Results_; + TDescribeTableMap Results_; NThreading::TFuture<void> AsyncFuture_; }; |
