summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-08-07 11:53:35 +0300
committervitalyisaev <[email protected]>2023-08-07 12:38:39 +0300
commitf7a9ec9e2cf0250943edb3b1dd331a4ed766b8da (patch)
tree2218f3dbb5d73d816dcf3720b7b8b7410919c8e6
parent609b1bd159b40b1f2819df6b9e22aede02f50ce5 (diff)
YQ Connector: fix column selection
Теперь стали возможны запросы вида * `SELECT col1 FROM ...` * `SELECT col1 + col2 FROM ...` * `SELECT col1 AS col2 FROM ...`
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp30
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp52
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp8
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp2
4 files changed, 56 insertions, 36 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index e7c81286227..f3133799d2e 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -134,31 +134,31 @@ namespace NYql::NDq {
ui64 total = 0;
- for (const auto& batch : Result_->RecordBatches) {
- total += NUdf::GetSizeOfArrowBatchInBytes(*batch);
+ // It's very important to fill UV columns in the alphabet order,
+ // paying attention to the scalar field containing block length.
+ TVector<TString> fieldNames;
+ std::transform(Select_.what().items().cbegin(), Select_.what().items().cend(),
+ std::back_inserter(fieldNames), [](const auto& item) { return item.column().name(); });
- YQL_CLOG(TRACE, ProviderGeneric) << "Converting arrow::RecordBatch into NUdf::UnboxedValue:\n"
- << batch->ToString();
+ fieldNames.push_back(std::string(BlockLengthColumnName));
+ std::sort(fieldNames.begin(), fieldNames.end());
+ std::map<TStringBuf, std::size_t> fieldNameOrder;
+ for (std::size_t i = 0; i < fieldNames.size(); i++) {
+ fieldNameOrder[fieldNames[i]] = i;
+ }
- // It's very important to fill UV column in the alphabet order,
- // paying attention to the scalar field containing block length.
- auto fieldNames = batch->schema()->field_names();
- fieldNames.push_back(std::string(BlockLengthColumnName));
- std::sort(fieldNames.begin(), fieldNames.end());
- std::map<std::string, std::size_t> fieldNameOrder;
- for (std::size_t i = 0; i < fieldNames.size(); i++) {
- fieldNameOrder[fieldNames[i]] = i;
- }
+ for (const auto& batch : Result_->RecordBatches) {
+ total += NUdf::GetSizeOfArrowBatchInBytes(*batch);
NUdf::TUnboxedValue* structItems = nullptr;
- auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, 1 + batch->num_columns(), structItems);
+ auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, fieldNames.size(), structItems);
for (int i = 0; i < batch->num_columns(); ++i) {
const auto& columnName = batch->schema()->field(i)->name();
const auto ix = fieldNameOrder[columnName];
structItems[ix] = HolderFactory_.CreateArrowBlock(arrow::Datum(batch->column(i)));
}
- structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory_.CreateArrowBlock(
+ structItems[fieldNameOrder[BlockLengthColumnName]] = HolderFactory_.CreateArrowBlock(
arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch->num_rows())));
value = structObj;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
index 6c6959d331f..801ae451642 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
@@ -53,6 +53,11 @@ namespace NYql {
TString clusterName{input->Child(TGenSourceSettings::idx_Cluster)->Content()};
TString tableName{input->Child(TGenSourceSettings::idx_Table)->Content()};
+ THashSet<TStringBuf> columnSet;
+ for (const auto& child : input->Child(TGenSourceSettings::idx_Columns)->Children()) {
+ columnSet.insert(child->Content());
+ }
+
auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos()));
if (issue.has_value()) {
ctx.AddError(issue.value());
@@ -64,18 +69,18 @@ namespace NYql {
const auto structExprType = tableMeta.value()->ItemType;
for (const auto& item : structExprType->GetItems()) {
- blockRowTypeItems.push_back(
- ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType())));
+ // Filter out columns that are not required in this query
+ if (columnSet.contains(item->GetName())) {
+ blockRowTypeItems.push_back(
+ ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType())));
+ }
}
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(
BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));
const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems);
- // Struct column order
- YQL_CLOG(INFO, ProviderGeneric)
- << "StructExprType column order:"
- << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString();
+ YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString();
auto streamExprType = ctx.MakeType<TStreamExprType>(typeAnnotationNode);
input->SetTypeAnn(streamExprType);
@@ -105,21 +110,21 @@ namespace NYql {
return TStatus::Error;
}
- TMaybe<THashSet<TStringBuf>> columnsSet;
+ TMaybe<THashSet<TStringBuf>> columnSet;
auto columns = input->Child(TGenReadTable::idx_Columns);
if (!columns->IsCallable(TCoVoid::CallableName())) {
if (!EnsureTuple(*columns, ctx)) {
return TStatus::Error;
}
- columnsSet.ConstructInPlace();
+ columnSet.ConstructInPlace();
for (auto& child : columns->Children()) {
if (!EnsureAtom(*child, ctx)) {
return TStatus::Error;
}
auto name = child->Content();
- if (!columnsSet->insert(name).second) {
+ if (!columnSet->insert(name).second) {
ctx.AddError(
TIssue(ctx.GetPosition(child->Pos()), TStringBuilder() << "Duplicated column name: " << name));
return TStatus::Error;
@@ -139,13 +144,15 @@ namespace NYql {
auto itemType = tableMeta.value()->ItemType;
auto columnOrder = tableMeta.value()->ColumnOrder;
- YQL_CLOG(INFO, ProviderGeneric) << "Custom column order:" << StateColumnOrderToString(columnOrder);
+ if (columnSet) {
+ YQL_CLOG(INFO, ProviderGeneric) << "custom column set" << ColumnSetToString(*columnSet.Get());
- if (columnsSet) {
TVector<const TItemExprType*> items = itemType->GetItems();
- EraseIf(items, [&](const TItemExprType* item) { return !columnsSet->contains(item->GetName()); });
- EraseIf(columnOrder, [&](const TString& col) { return !columnsSet->contains(col); });
+ EraseIf(items, [&](const TItemExprType* item) { return !columnSet->contains(item->GetName()); });
+ EraseIf(columnOrder, [&](const TString& col) { return !columnSet->contains(col); });
itemType = ctx.MakeType<TStructExprType>(items);
+
+ YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(itemType))->ToString();
}
input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
@@ -154,11 +161,11 @@ namespace NYql {
return State_->Types->SetColumnOrder(*input, columnOrder, ctx);
}
- TString StateColumnOrderToString(const TVector<TString>& columns) {
+ TString ColumnOrderToString(const TVector<TString>& columns) {
TStringBuilder sb;
for (std::size_t i = 0; i < columns.size(); i++) {
- sb << i << ": " << columns[i];
+ sb << i << "=" << columns[i];
if (i != columns.size() - 1) {
sb << ", ";
}
@@ -167,6 +174,21 @@ namespace NYql {
return sb;
}
+ TString ColumnSetToString(const THashSet<TStringBuf>& columnSet) {
+ TStringBuilder sb;
+
+ std::size_t i = 0;
+ for (const auto key : columnSet) {
+ sb << i << "=" << key;
+ if (i != columnSet.size() - 1) {
+ sb << ", ";
+ }
+ i++;
+ }
+
+ return sb;
+ }
+
private:
TGenericState::TPtr State_;
};
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index 13123410d5e..623bcad3fe6 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -41,8 +41,6 @@ namespace NYql {
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
const auto genReadTable = maybeGenReadTable.Cast();
const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue();
- YQL_CLOG(INFO, ProviderGeneric) << "Wrap " << read->Content() << " with token: " << token;
-
const auto rowType = genReadTable.Ref()
.GetTypeAnn()
->Cast<TTupleExprType>()
@@ -131,11 +129,11 @@ namespace NYql {
for (size_t i = 0; i < columns.Size(); i++) {
// assign column name
auto column = items->Add()->mutable_column();
- auto column_name = columns.Item(i).StringValue();
- column->mutable_name()->assign(column_name);
+ auto columnName = columns.Item(i).StringValue();
+ column->mutable_name()->assign(columnName);
// assign column type
- auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, column_name);
+ auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, columnName);
column->mutable_type()->CopyFrom(type);
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index f3f2679b7a8..bc53eb51e43 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -243,7 +243,7 @@ namespace NYql {
items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation));
columnOrder.emplace_back(columns.Get(i).name());
}
- // FIXME: handle on Generic's side?
+ // FIXME: handle on Connector's side?
return std::make_pair(ctx.MakeType<TStructExprType>(items), TString("Europe/Moscow"));
} catch (std::exception&) {
ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));