aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-08-04 11:09:13 +0300
committerbbiff <bbiff@yandex-team.com>2022-08-04 11:09:13 +0300
commit7be82a9836184e110d7551a329383b89f9f6f19c (patch)
tree016ecfeca92892c00aa6f4b1d432d8bc9f17dcaf
parent12845e3c5d1e4c5c6aeb6d0cafdd7fd161533f11 (diff)
downloadydb-7be82a9836184e110d7551a329383b89f9f6f19c.tar.gz
ignoring compression bug
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.cpp4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp31
2 files changed, 26 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp
index 426eb84498..d2ede218e6 100644
--- a/ydb/library/yql/providers/common/mkql/parser.cpp
+++ b/ydb/library/yql/providers/common/mkql/parser.cpp
@@ -33,8 +33,8 @@ std::array<TString, 2U> GetSettings(const TExprNode& settings) {
NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
writer.OpenMap();
child.Tail().ForEachChild([&writer, &compression](const TExprNode& pair) {
- if (pair.Head().IsAtom("compression") && pair.Tail().IsCallable({"String", "Utf8"}))
- if (const auto& comp = pair.Tail().Head().Content(); !comp.empty())
+ if (pair.Head().IsAtom("compression"))
+ if (const auto& comp = pair.Tail().Content(); !comp.empty())
compression = comp;
else {
writer.WriteKey(pair.Head().Content());
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index ea67ffd422..237f0d8238 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -74,12 +74,29 @@ public:
const auto rowType = pqReadTopic.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType();
const auto& clusterName = pqReadTopic.DataSource().Cluster().StringValue();
- const auto settings = Build<TCoNameValueTupleList>(ctx, pqReadTopic.Topic().Pos())
- .Add()
- .Name().Value("format").Build()
- .Value(pqReadTopic.Format())
- .Build()
- .Done();
+
+ TExprNode::TListType settings(1U,
+ ctx.Builder(pqReadTopic.Topic().Pos())
+ .List()
+ .Atom(0, "format", TNodeFlags::Default)
+ .Add(1, pqReadTopic.Format().Ptr())
+ .Seal().Build()
+ );
+
+ if (pqReadTopic.Compression() != "") {
+ settings.emplace_back(
+ ctx.Builder(pqReadTopic.Compression().Pos())
+ .List()
+ .Atom(0, "settings", TNodeFlags::Default)
+ .List(1)
+ .List(0)
+ .Atom(0, "compression")
+ .Atom(1, pqReadTopic.Compression())
+ .Seal()
+ .Seal()
+ .Seal().Build()
+ );
+ }
const auto token = "cluster:default_" + clusterName;
auto columns = pqReadTopic.Columns().Ptr();
@@ -103,7 +120,7 @@ public:
.Build()
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
- .Settings(settings)
+ .Settings(ctx.NewList(pqReadTopic.Topic().Pos(), std::move(settings)))
.Done().Ptr();
}
return read;