summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Isaev <[email protected]>2025-02-07 16:43:04 +0300
committerGitHub <[email protected]>2025-02-07 16:43:04 +0300
commitd9d9787dffb47f48d6383db99f99d0b4d3a59c98 (patch)
treeb07c2bfbdbdcdb2a15dbe2a29cb5f1187f1cf4e8
parentfaa4814de5de2578f0a81308477649fa2c1283cc (diff)
YDB FQ: simplify `yql_generic_load_meta.cpp` code (#14307)
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.cpp18
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.h7
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ya.make1
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp185
4 files changed, 110 insertions, 101 deletions
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
index 7646562cf60..2ce7d8b5df0 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
@@ -12,12 +12,12 @@ namespace NYql::NConnector {
return error;
}
- TIssues ErrorToIssues(const NApi::TError& error) {
+ TIssues ErrorToIssues(const NApi::TError& error, TString prefix) {
TIssues issues;
issues.Reserve(error.get_arr_issues().size() + 1);
// add high-level error
- issues.AddIssue(TIssue(error.message()));
+ issues.AddIssue(TIssue(TStringBuilder() << prefix << error.message()));
// convert detailed errors
for (auto& subIssue : error.get_arr_issues()) {
@@ -44,20 +44,6 @@ namespace NYql::NConnector {
}
}
- void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) {
- // add high-level error
- TStringBuilder ss;
- ss << summary << ": status=" << Ydb::StatusIds_StatusCode_Name(error.status()) << ", message=" << error.message();
- ctx.AddError(TIssue(position, ss));
-
- // convert detailed errors
- TIssues issues;
- IssuesFromMessage(error.get_arr_issues(), issues);
- for (const auto& issue : issues) {
- ctx.AddError(issue);
- }
- }
-
NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status) {
NApi::TError result;
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.h b/ydb/library/yql/providers/generic/connector/libcpp/error.h
index 94218d24e93..99c3772be0b 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.h
@@ -2,10 +2,11 @@
#include <grpcpp/support/status.h>
-#include <yql/essentials/ast/yql_expr.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
+#include <yql/essentials/public/issue/yql_issue.h>
+#include <yql/essentials/utils/yql_panic.h>
namespace NYql::NConnector {
NApi::TError NewSuccess();
@@ -29,12 +30,10 @@ namespace NYql::NConnector {
return ok;
}
- TIssues ErrorToIssues(const NApi::TError& error);
+ TIssues ErrorToIssues(const NApi::TError& error, TString prefix = "");
NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error);
- void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary);
-
NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status);
inline bool GrpcStatusEndOfStream(const NYdbGrpc::TGrpcStatus& status) noexcept {
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
index 1a8e4551202..d7b7fd6fdf8 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
@@ -11,7 +11,6 @@ PEERDIR(
contrib/libs/grpc
ydb/core/formats/arrow/serializer
ydb/public/sdk/cpp/src/library/grpc/client
- yql/essentials/ast
ydb/library/yql/dq/actors/protos
yql/essentials/providers/common/proto
yql/essentials/providers/common/proto
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index d243d597e56..1628db86aca 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -34,7 +34,7 @@ namespace NYql {
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
};
- using TMapType =
+ using TDescribeTableMap =
std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;
public:
@@ -50,7 +50,7 @@ namespace NYql {
return TStatus::Ok;
}
- std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables;
+ std::unordered_set<TDescribeTableMap::key_type, TDescribeTableMap::hasher> pendingTables;
const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
@@ -146,110 +146,135 @@ namespace NYql {
});
TNodeOnNodeOwnedMap replaces(reads.size());
- bool hasErrors = false;
for (const auto& r : reads) {
- const TGenRead read(r);
- const auto clusterName = read.DataSource().Cluster().StringValue();
- const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
- const auto tableName = TString(keyArg.Tail().Head().Content());
+ TIssues issues = HandleDescribeTableResponse(r, ctx, replaces);
+ if (issues) {
+ for (const auto& issue : issues) {
+ ctx.AddError(issue);
+ }
- const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
- if (Results_.cend() != it) {
- const auto& response = it->second->Response;
+ return TStatus::Error;
+ }
+ }
- if (NConnector::IsSuccess(*response)) {
- TGenericState::TTableMeta tableMeta;
- tableMeta.Schema = response->schema();
- tableMeta.DataSourceInstance = it->second->DataSourceInstance;
+ return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
+ }
- const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder);
+ void Rewind() final {
+ Results_.clear();
+ AsyncFuture_ = {};
+ }
- if (parse) {
- tableMeta.ItemType = parse;
- if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
- // clang-format off
- auto row = Build<TCoArgument>(ctx, read.Pos())
- .Name("row")
- .Done();
+ private:
+ TIssues HandleDescribeTableResponse(
+ const TIntrusivePtr<TExprNode>& read,
+ TExprContext& ctx,
+ TNodeOnNodeOwnedMap& replaces
+ ) {
+ const TGenRead genRead(read);
+ const auto clusterName = genRead.DataSource().Cluster().StringValue();
+ const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
+ const auto tableName = TString(keyArg.Tail().Head().Content());
- auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
- .Args({row})
- .Body<TCoBool>()
- .Literal().Build("true")
- .Build()
- .Done().Ptr();
+ const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
- auto table = Build<TGenTable>(ctx, read.Pos())
- .Name().Value(tableName).Build()
- .Splits<TCoVoid>().Build().Done();
+ if (it == Results_.cend()) {
+ TIssues issues;
+ issues.AddIssue(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder()
+ << "Not found result for " << clusterName << '.' << tableName));
+ return issues;
+ }
- ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
- .World(read.World())
- .DataSource(read.DataSource())
- .Table(table)
- .Columns<TCoVoid>().Build()
- .FilterPredicate(emptyPredicate)
- .Done().Ptr();
- // clang-format on
- }
- State_->AddTable(clusterName, tableName, std::move(tableMeta));
- } else {
- hasErrors = true;
- break;
- }
- } else {
- const auto& error = response->error();
- NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()),
- TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName);
- hasErrors = true;
- break;
- }
- } else {
- ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder()
- << "Not found result for " << clusterName << '.' << tableName));
- hasErrors = true;
- break;
- }
+ const auto& response = it->second->Response;
+
+ if (!NConnector::IsSuccess(*response)) {
+ return NConnector::ErrorToIssues(
+ response->error(),
+ TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName
+ );
}
- if (hasErrors) {
- return TStatus::Error;
+ TGenericState::TTableMeta tableMeta;
+ tableMeta.Schema = response->schema();
+ tableMeta.DataSourceInstance = it->second->DataSourceInstance;
+
+ auto issues = ParseTableMeta(ctx, ctx.GetPosition(read->Pos()), clusterName, tableName, tableMeta);
+ if (issues) {
+ return issues;
}
- return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
- }
+ if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) {
+ ins.first->second = MakeTableMetaNode(ctx, genRead, tableName);
+ }
- void Rewind() final {
- Results_.clear();
- AsyncFuture_ = {};
+ State_->AddTable(clusterName, tableName, std::move(tableMeta));
+ return TIssues{};
}
- private:
- const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster,
- const std::string_view& table, TExprContext& ctx, TVector<TString>& columnOrder) try {
+ TIssues ParseTableMeta(
+ TExprContext& ctx,
+ const TPosition& pos,
+ const std::string_view& cluster,
+ const std::string_view& table,
+ TGenericState::TTableMeta& tableMeta
+ ) try {
TVector<const TItemExprType*> items;
- auto columns = schema.columns();
+ const auto& columns = tableMeta.Schema.columns();
if (columns.empty()) {
- ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
- return nullptr;
+ TIssues issues;
+ issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
+ return issues;
}
- for (auto i = 0; i < columns.size(); i++) {
+ for (const auto& column: columns) {
// Make type annotation
- NYdb::TTypeParser parser(columns.Get(i).type());
+ NYdb::TTypeParser parser(column.type());
auto typeAnnotation = NFq::MakeType(parser, ctx);
// Create items from graph
- items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation));
- columnOrder.emplace_back(columns.Get(i).name());
+ items.emplace_back(ctx.MakeType<TItemExprType>(column.name(), typeAnnotation));
+ tableMeta.ColumnOrder.emplace_back(column.name());
}
- // FIXME: handle on Connector's side?
- return ctx.MakeType<TStructExprType>(items);
+
+ tableMeta.ItemType = ctx.MakeType<TStructExprType>(items);
+ return TIssues{};
} catch (std::exception&) {
- ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
- return nullptr;
+ TIssues issues;
+ issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
+ return issues;
+ }
+
+ TExprNode::TPtr MakeTableMetaNode(
+ TExprContext& ctx,
+ const TGenRead& read,
+ const TString& tableName
+ ) {
+ // clang-format off
+ auto row = Build<TCoArgument>(ctx, read.Pos())
+ .Name("row")
+ .Done();
+
+ auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
+ .Args({row})
+ .Body<TCoBool>()
+ .Literal().Build("true")
+ .Build()
+ .Done().Ptr();
+
+ auto table = Build<TGenTable>(ctx, read.Pos())
+ .Name().Value(tableName).Build()
+ .Splits<TCoVoid>().Build().Done();
+
+ return Build<TGenReadTable>(ctx, read.Pos())
+ .World(read.World())
+ .DataSource(read.DataSource())
+ .Table(table)
+ .Columns<TCoVoid>().Build()
+ .FilterPredicate(emptyPredicate)
+ .Done().Ptr();
+ // clang-format on
}
void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig,
@@ -402,7 +427,7 @@ namespace NYql {
private:
const TGenericState::TPtr State_;
- TMapType Results_;
+ TDescribeTableMap Results_;
NThreading::TFuture<void> AsyncFuture_;
};