diff options
| author | vitalyisaev <[email protected]> | 2023-08-07 11:53:35 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-08-07 12:38:39 +0300 |
| commit | f7a9ec9e2cf0250943edb3b1dd331a4ed766b8da (patch) | |
| tree | 2218f3dbb5d73d816dcf3720b7b8b7410919c8e6 | |
| parent | 609b1bd159b40b1f2819df6b9e22aede02f50ce5 (diff) | |
YQ Connector: fix column selection
Теперь стали возможны запросы вида
* `SELECT col1 FROM ...`
* `SELECT col1 + col2 FROM ...`
* `SELECT col1 AS col2 FROM ...`
4 files changed, 56 insertions, 36 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index e7c81286227..f3133799d2e 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -134,31 +134,31 @@ namespace NYql::NDq { ui64 total = 0; - for (const auto& batch : Result_->RecordBatches) { - total += NUdf::GetSizeOfArrowBatchInBytes(*batch); + // It's very important to fill UV columns in the alphabet order, + // paying attention to the scalar field containing block length. + TVector<TString> fieldNames; + std::transform(Select_.what().items().cbegin(), Select_.what().items().cend(), + std::back_inserter(fieldNames), [](const auto& item) { return item.column().name(); }); - YQL_CLOG(TRACE, ProviderGeneric) << "Converting arrow::RecordBatch into NUdf::UnboxedValue:\n" - << batch->ToString(); + fieldNames.push_back(std::string(BlockLengthColumnName)); + std::sort(fieldNames.begin(), fieldNames.end()); + std::map<TStringBuf, std::size_t> fieldNameOrder; + for (std::size_t i = 0; i < fieldNames.size(); i++) { + fieldNameOrder[fieldNames[i]] = i; + } - // It's very important to fill UV column in the alphabet order, - // paying attention to the scalar field containing block length. - auto fieldNames = batch->schema()->field_names(); - fieldNames.push_back(std::string(BlockLengthColumnName)); - std::sort(fieldNames.begin(), fieldNames.end()); - std::map<std::string, std::size_t> fieldNameOrder; - for (std::size_t i = 0; i < fieldNames.size(); i++) { - fieldNameOrder[fieldNames[i]] = i; - } + for (const auto& batch : Result_->RecordBatches) { + total += NUdf::GetSizeOfArrowBatchInBytes(*batch); NUdf::TUnboxedValue* structItems = nullptr; - auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, 1 + batch->num_columns(), structItems); + auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, fieldNames.size(), structItems); for (int i = 0; i < batch->num_columns(); ++i) { const auto& columnName = batch->schema()->field(i)->name(); const auto ix = fieldNameOrder[columnName]; structItems[ix] = HolderFactory_.CreateArrowBlock(arrow::Datum(batch->column(i))); } - structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory_.CreateArrowBlock( + structItems[fieldNameOrder[BlockLengthColumnName]] = HolderFactory_.CreateArrowBlock( arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch->num_rows()))); value = structObj; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index 6c6959d331f..801ae451642 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -53,6 +53,11 @@ namespace NYql { TString clusterName{input->Child(TGenSourceSettings::idx_Cluster)->Content()}; TString tableName{input->Child(TGenSourceSettings::idx_Table)->Content()}; + THashSet<TStringBuf> columnSet; + for (const auto& child : input->Child(TGenSourceSettings::idx_Columns)->Children()) { + columnSet.insert(child->Content()); + } + auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos())); if (issue.has_value()) { ctx.AddError(issue.value()); @@ -64,18 +69,18 @@ namespace NYql { const auto structExprType = tableMeta.value()->ItemType; for (const auto& item : structExprType->GetItems()) { - blockRowTypeItems.push_back( - ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType()))); + // Filter out columns that are not required in this query + if (columnSet.contains(item->GetName())) { + blockRowTypeItems.push_back( + ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType()))); + } } blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>( BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)))); const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems); - // Struct column order - YQL_CLOG(INFO, ProviderGeneric) - << "StructExprType column order:" - << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString(); + YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString(); auto streamExprType = ctx.MakeType<TStreamExprType>(typeAnnotationNode); input->SetTypeAnn(streamExprType); @@ -105,21 +110,21 @@ namespace NYql { return TStatus::Error; } - TMaybe<THashSet<TStringBuf>> columnsSet; + TMaybe<THashSet<TStringBuf>> columnSet; auto columns = input->Child(TGenReadTable::idx_Columns); if (!columns->IsCallable(TCoVoid::CallableName())) { if (!EnsureTuple(*columns, ctx)) { return TStatus::Error; } - columnsSet.ConstructInPlace(); + columnSet.ConstructInPlace(); for (auto& child : columns->Children()) { if (!EnsureAtom(*child, ctx)) { return TStatus::Error; } auto name = child->Content(); - if (!columnsSet->insert(name).second) { + if (!columnSet->insert(name).second) { ctx.AddError( TIssue(ctx.GetPosition(child->Pos()), TStringBuilder() << "Duplicated column name: " << name)); return TStatus::Error; @@ -139,13 +144,15 @@ namespace NYql { auto itemType = tableMeta.value()->ItemType; auto columnOrder = tableMeta.value()->ColumnOrder; - YQL_CLOG(INFO, ProviderGeneric) << "Custom column order:" << StateColumnOrderToString(columnOrder); + if (columnSet) { + YQL_CLOG(INFO, ProviderGeneric) << "custom column set" << ColumnSetToString(*columnSet.Get()); - if (columnsSet) { TVector<const TItemExprType*> items = itemType->GetItems(); - EraseIf(items, [&](const TItemExprType* item) { return !columnsSet->contains(item->GetName()); }); - EraseIf(columnOrder, [&](const TString& col) { return !columnsSet->contains(col); }); + EraseIf(items, [&](const TItemExprType* item) { return !columnSet->contains(item->GetName()); }); + EraseIf(columnOrder, [&](const TString& col) { return !columnSet->contains(col); }); itemType = ctx.MakeType<TStructExprType>(items); + + YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(itemType))->ToString(); } input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{ @@ -154,11 +161,11 @@ namespace NYql { return State_->Types->SetColumnOrder(*input, columnOrder, ctx); } - TString StateColumnOrderToString(const TVector<TString>& columns) { + TString ColumnOrderToString(const TVector<TString>& columns) { TStringBuilder sb; for (std::size_t i = 0; i < columns.size(); i++) { - sb << i << ": " << columns[i]; + sb << i << "=" << columns[i]; if (i != columns.size() - 1) { sb << ", "; } @@ -167,6 +174,21 @@ namespace NYql { return sb; } + TString ColumnSetToString(const THashSet<TStringBuf>& columnSet) { + TStringBuilder sb; + + std::size_t i = 0; + for (const auto key : columnSet) { + sb << i << "=" << key; + if (i != columnSet.size() - 1) { + sb << ", "; + } + i++; + } + + return sb; + } + private: TGenericState::TPtr State_; }; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 13123410d5e..623bcad3fe6 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -41,8 +41,6 @@ namespace NYql { if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) { const auto genReadTable = maybeGenReadTable.Cast(); const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue(); - YQL_CLOG(INFO, ProviderGeneric) << "Wrap " << read->Content() << " with token: " << token; - const auto rowType = genReadTable.Ref() .GetTypeAnn() ->Cast<TTupleExprType>() @@ -131,11 +129,11 @@ namespace NYql { for (size_t i = 0; i < columns.Size(); i++) { // assign column name auto column = items->Add()->mutable_column(); - auto column_name = columns.Item(i).StringValue(); - column->mutable_name()->assign(column_name); + auto columnName = columns.Item(i).StringValue(); + column->mutable_name()->assign(columnName); // assign column type - auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, column_name); + auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, columnName); column->mutable_type()->CopyFrom(type); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index f3f2679b7a8..bc53eb51e43 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -243,7 +243,7 @@ namespace NYql { items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation)); columnOrder.emplace_back(columns.Get(i).name()); } - // FIXME: handle on Generic's side? + // FIXME: handle on Connector's side? return std::make_pair(ctx.MakeType<TStructExprType>(items), TString("Europe/Moscow")); } catch (std::exception&) { ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage())); |
