aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-02-07 17:37:24 +0300
committerqrort <qrort@yandex-team.com>2023-02-07 17:37:24 +0300
commit47c51f98cc092d5d9aeed786b4c643cfa74f114e (patch)
treef0bdf6360b2174821effb66fe3b3c712b287fb26
parent6ac0a7ffffa73823c1316966afbcfd7aa86cb6fb (diff)
downloadydb-47c51f98cc092d5d9aeed786b4c643cfa74f114e.tar.gz
pg insert query support
pg insert query support
-rw-r--r--ydb/core/engine/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/engine/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/engine/CMakeLists.linux.txt1
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.cpp10
-rw-r--r--ydb/core/engine/mkql_keys.cpp67
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp4
-rw-r--r--ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp32
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.cpp9
-rw-r--r--ydb/core/kqp/runtime/kqp_runtime_impl.h2
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp174
-rw-r--r--ydb/core/scheme_types/scheme_raw_type_value.h1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux.txt2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp34
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp35
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h9
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp16
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_columnorder.cpp49
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp56
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp34
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h1
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp10
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp3
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp1
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp6
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp66
31 files changed, 487 insertions, 148 deletions
diff --git a/ydb/core/engine/CMakeLists.darwin.txt b/ydb/core/engine/CMakeLists.darwin.txt
index 6773178a175..7b5c293de38 100644
--- a/ydb/core/engine/CMakeLists.darwin.txt
+++ b/ydb/core/engine/CMakeLists.darwin.txt
@@ -30,6 +30,7 @@ target_link_libraries(ydb-core-engine PUBLIC
library-yql-minikql
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-decimal
)
target_sources(ydb-core-engine PRIVATE
diff --git a/ydb/core/engine/CMakeLists.linux-aarch64.txt b/ydb/core/engine/CMakeLists.linux-aarch64.txt
index f14553b25bf..0cfa0bb69b5 100644
--- a/ydb/core/engine/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/engine/CMakeLists.linux-aarch64.txt
@@ -31,6 +31,7 @@ target_link_libraries(ydb-core-engine PUBLIC
library-yql-minikql
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-decimal
)
target_sources(ydb-core-engine PRIVATE
diff --git a/ydb/core/engine/CMakeLists.linux.txt b/ydb/core/engine/CMakeLists.linux.txt
index f14553b25bf..0cfa0bb69b5 100644
--- a/ydb/core/engine/CMakeLists.linux.txt
+++ b/ydb/core/engine/CMakeLists.linux.txt
@@ -31,6 +31,7 @@ target_link_libraries(ydb-core-engine PUBLIC
library-yql-minikql
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-decimal
)
target_sources(ydb-core-engine PRIVATE
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp
index b8326dc363c..044e3d3e5c8 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.cpp
+++ b/ydb/core/engine/minikql/minikql_engine_host.cpp
@@ -23,13 +23,12 @@ void ConvertTableKeys(const TScheme& scheme, const TScheme::TTableInfo* tableInf
for (size_t keyIdx = 0; keyIdx < row.size(); keyIdx++) {
const TCell& cell = row[keyIdx];
ui32 keyCol = tableInfo->KeyColumns[keyIdx];
- // TODO: support pg types
- NScheme::TTypeId vtype = scheme.GetColumnInfo(tableInfo, keyCol)->PType.GetTypeId();
+ NScheme::TTypeInfo vtypeInfo = scheme.GetColumnInfo(tableInfo, keyCol)->PType;
if (cell.IsNull()) {
key.emplace_back();
bytes += 1;
} else {
- key.emplace_back(cell.Data(), cell.Size(), vtype);
+ key.emplace_back(cell.Data(), cell.Size(), vtypeInfo);
bytes += cell.Size();
}
}
@@ -891,10 +890,9 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell
for (size_t i = 0; i < commands.size(); i++) {
const TUpdateCommand& upd = commands[i];
Y_VERIFY(upd.Operation == TKeyDesc::EColumnOperation::Set); // TODO[serxa]: support inplace update in update row
- // TODO: support pg types
- NScheme::TTypeId vtype = Scheme.GetColumnInfo(tableInfo, upd.Column)->PType.GetTypeId();
+ auto vtypeinfo = Scheme.GetColumnInfo(tableInfo, upd.Column)->PType;
ops.emplace_back(upd.Column, NTable::ECellOp::Set,
- upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtype));
+ upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtypeinfo));
valueBytes += upd.Value.IsNull() ? 1 : upd.Value.Size();
}
diff --git a/ydb/core/engine/mkql_keys.cpp b/ydb/core/engine/mkql_keys.cpp
index fb34be11586..d5cd54a32d7 100644
--- a/ydb/core/engine/mkql_keys.cpp
+++ b/ydb/core/engine/mkql_keys.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/core/base/domain.h>
#include <ydb/core/scheme_types/scheme_types_defs.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
#include <util/generic/maybe.h>
#include <util/generic/algorithm.h>
@@ -38,6 +39,19 @@ bool ExtractKeyData(TRuntimeNode valueNode, bool isOptional, NUdf::TUnboxedValue
return false;
}
+NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOptional) {
+ isOptional = false;
+ if (type->GetKind() == TType::EKind::Pg) {
+ auto pgType = static_cast<TPgType*>(type);
+ auto pgTypeId = pgType->GetTypeId();
+ return NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(pgTypeId));
+ } else {
+ auto dataType = UnpackOptionalData(type, isOptional);
+ return NScheme::TTypeInfo(dataType->GetSchemeType());
+ }
+}
+
+
template<typename T>
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
@@ -58,9 +72,8 @@ THolder<TKeyDesc> ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
for (ui32 i = 0; i < tuple->GetValuesCount(); ++i) {
auto type = tuple->GetType()->GetElementType(i);
bool isOptional;
- auto dataType = UnpackOptionalData(type, isOptional);
- // TODO: support pg types
- keyColumnTypes[i] = NScheme::TTypeInfo(dataType->GetSchemeType());
+ auto typeInfo = UnpackTypeInfo(type, isOptional);
+ keyColumnTypes[i] = typeInfo;
if (i != staticComponents) {
continue;
}
@@ -75,8 +88,6 @@ THolder<TKeyDesc> ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
}
++staticComponents;
- // TODO: support pg types
- Y_VERIFY(keyColumnTypes[i].GetTypeId() != NScheme::NTypeIds::Pg);
fromValues[i] = toValues[i] = MakeCell(keyColumnTypes[i], data, env);
}
@@ -95,11 +106,9 @@ void ExtractReadColumns(TStructType* columnsType, TStructLiteral* tags, TVector<
op.Column = columnId;
op.Operation = TKeyDesc::EColumnOperation::Read;
bool isOptional;
- auto dataType = UnpackOptionalData(columnsType->GetMemberType(i), isOptional);
- auto expectedType = (NScheme::TTypeId)dataType->GetSchemeType();
- MKQL_ENSURE(expectedType != 0, "Null type is not allowed");
- // TODO: support pg types
- op.ExpectedType = NScheme::TTypeInfo(expectedType);
+ auto typeInfo = UnpackTypeInfo(columnsType->GetMemberType(i), isOptional);
+ MKQL_ENSURE(typeInfo.GetTypeId() != 0, "Null type is not allowed");
+ op.ExpectedType = typeInfo;
}
}
@@ -145,10 +154,8 @@ THolder<TKeyDesc> ExtractSelectRange(TCallable& callable, const TTypeEnvironment
for (ui32 i = 0; i < fromTuple->GetValuesCount(); ++i) {
auto type = fromTuple->GetType()->GetElementType(i);
bool isOptional;
- auto dataType = UnpackOptionalData(type, isOptional);
- auto keyType = (NScheme::TTypeId)dataType->GetSchemeType();
- // TODO: support pg types
- keyColumnTypes[i] = NScheme::TTypeInfo(keyType);
+ auto typeInfo = UnpackTypeInfo(type, isOptional);
+ keyColumnTypes[i] = typeInfo;
auto valueNode = fromTuple->GetValue(i);
NUdf::TUnboxedValue data;
bool hasImmediateData = ExtractKeyData(valueNode, isOptional, data);
@@ -159,10 +166,8 @@ THolder<TKeyDesc> ExtractSelectRange(TCallable& callable, const TTypeEnvironment
for (ui32 i = 0; i < toTuple->GetValuesCount(); ++i) {
auto type = toTuple->GetType()->GetElementType(i);
bool isOptional;
- auto dataType = UnpackOptionalData(type, isOptional);
- auto keyType = (NScheme::TTypeId)dataType->GetSchemeType();
- // TODO: support pg types
- keyColumnTypes[i] = NScheme::TTypeInfo(keyType);
+ auto typeInfo = UnpackTypeInfo(type, isOptional);
+ keyColumnTypes[i] = typeInfo;
auto valueNode = toTuple->GetValue(i);
NUdf::TUnboxedValue data;
bool hasImmediateData = ExtractKeyData(valueNode, isOptional, data);
@@ -213,9 +218,8 @@ THolder<TKeyDesc> ExtractUpdateRow(TCallable& callable, const TTypeEnvironment&
auto valueNode = tuple->GetValue(1);
op.Operation = TKeyDesc::EColumnOperation::InplaceUpdate;
bool isOptional;
- auto keyType = (NScheme::TTypeId)UnpackOptionalData(valueNode, isOptional)->GetSchemeType();
- // TODO: support pg types
- op.ExpectedType = NScheme::TTypeInfo(keyType);
+ auto typeInfo = UnpackTypeInfo(valueNode.GetStaticType(), isOptional);
+ op.ExpectedType = typeInfo;
MKQL_ENSURE(!isOptional, "Expected data type for inplace update, not an optional");
op.InplaceUpdateMode = mode;
@@ -228,9 +232,8 @@ THolder<TKeyDesc> ExtractUpdateRow(TCallable& callable, const TTypeEnvironment&
// update
op.Operation = TKeyDesc::EColumnOperation::Set;
bool isOptional;
- auto keyType = (NScheme::TTypeId)UnpackOptionalData(cmd, isOptional)->GetSchemeType();
- // TODO: support pg types
- op.ExpectedType = NScheme::TTypeInfo(keyType);
+ auto typeInfo = UnpackTypeInfo(cmd.GetStaticType(), isOptional);
+ op.ExpectedType = typeInfo;
MKQL_ENSURE(op.ExpectedType.GetTypeId() != 0, "Null type is not allowed");
NUdf::TUnboxedValue data;
@@ -261,9 +264,6 @@ TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value, con
if (!value)
return TCell();
- // TODO: support pg types
- Y_VERIFY(type.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported");
-
switch(type.GetTypeId()) {
KNOWN_FIXED_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_CELL)
case NUdf::TDataType<NUdf::TDecimal>::Id:
@@ -276,9 +276,18 @@ TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value, con
break;
}
- const auto& ref = value.AsStringRef();
- if (!copy && !value.IsEmbedded() || value.IsString() || TCell::CanInline(ref.Size()))
+ TString binary;
+ NYql::NUdf::TStringRef ref;
+ bool isPg = (type.GetTypeId() == NScheme::NTypeIds::Pg);
+ if (isPg) {
+ binary = NYql::NCommon::PgValueToNativeBinary(value, NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()));
+ ref = NYql::NUdf::TStringRef(binary);
+ } else {
+ ref = value.AsStringRef();
+ }
+ if (!isPg && (!copy && !value.IsEmbedded() || value.IsString() || TCell::CanInline(ref.Size()))) {
return TCell(ref.Data(), ref.Size());
+ }
const auto& val = env.NewString(ref.Size());
std::memcpy(val.Data(), ref.Data(), ref.Size());
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 2c898b3a4ac..7f9a8de2838 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -491,6 +491,7 @@ bool RequireLookupPrecomputeStage(const TKqlLookupTable& lookup) {
// pass
} else if (tuple.Value().Maybe<TCoDataCtor>()) {
// TODO: support pg types
+ Y_ENSURE(tuple.Value().Ref().GetTypeAnn()->GetKind() != NYql::ETypeAnnotationKind::Pg);
auto slot = tuple.Value().Ref().GetTypeAnn()->Cast<TDataExprType>()->GetSlot();
auto typeId = NUdf::GetDataTypeInfo(slot).TypeId;
auto typeInfo = NScheme::TTypeInfo(typeId);
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 019f65a478f..bef11b58d6d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -366,7 +366,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
YQL_ENSURE(tablesData);
const auto& tableData = tablesData->ExistingTable(cluster, table);
YQL_ENSURE(tableData.Metadata);
-
+
THashSet<std::string_view> updateColumns;
const auto& updateStructType = update.Update().Ref().GetTypeAnn()->Cast<TStructExprType>();
for (const auto& item : updateStructType->GetItems()) {
@@ -874,7 +874,7 @@ TYdbOperation GetTableOp(const TKiWriteTable& write) {
return TYdbOperation::Replace;
} else if (mode == "insert_revert") {
return TYdbOperation::InsertRevert;
- } else if (mode == "insert_abort") {
+ } else if (mode == "insert_abort" || mode == "append") {
return TYdbOperation::InsertAbort;
} else if (mode == "delete_on") {
return TYdbOperation::DeleteOn;
diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
index 3f9224477d0..3aeb51828ca 100644
--- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
@@ -70,21 +70,29 @@ TSmallVec<bool> GetSkipNullKeys(const TKqpReadTableSettings& settings, const TKi
return skipNullKeys;
}
-NMiniKQL::TType* CreateColumnType(NUdf::TDataTypeId typeId, const TKqlCompileContext& ctx) {
+NMiniKQL::TType* CreateColumnType(const NKikimr::NScheme::TTypeInfo& typeInfo, const TKqlCompileContext& ctx) {
+ auto typeId = typeInfo.GetTypeId();
if (typeId == NUdf::TDataType<NUdf::TDecimal>::Id) {
return ctx.PgmBuilder().NewDecimalType(22, 9);
+ } else if (typeId == NKikimr::NScheme::NTypeIds::Pg) {
+ return ctx.PgmBuilder().NewPgType(NPg::PgTypeIdFromTypeDesc(typeInfo.GetTypeDesc()));
} else {
return ctx.PgmBuilder().NewDataType(typeId);
}
}
-void ValidateColumnType(const TTypeAnnotationNode* type, NUdf::TDataTypeId columnTypeId) {
+void ValidateColumnType(const TTypeAnnotationNode* type, NKikimr::NScheme::TTypeId columnTypeId) {
YQL_ENSURE(type);
bool isOptional;
- const TDataExprType* dataType = nullptr;
- YQL_ENSURE(IsDataOrOptionalOfData(type, isOptional, dataType));
- auto schemeType = NUdf::GetDataTypeInfo(dataType->GetSlot()).TypeId;
- YQL_ENSURE(schemeType == columnTypeId);
+ if (columnTypeId == NKikimr::NScheme::NTypeIds::Pg) {
+ const TPgExprType* pgType = nullptr;
+ YQL_ENSURE(IsPg(type, pgType));
+ } else {
+ const TDataExprType* dataType = nullptr;
+ YQL_ENSURE(IsDataOrOptionalOfData(type, isOptional, dataType));
+ auto schemeType = NUdf::GetDataTypeInfo(dataType->GetSlot()).TypeId;
+ YQL_ENSURE(schemeType == columnTypeId);
+ }
}
void ValidateColumnsType(const TStreamExprType* streamType, const TKikimrTableMetadata& tableMeta) {
@@ -95,9 +103,7 @@ void ValidateColumnsType(const TStreamExprType* streamType, const TKikimrTableMe
auto columnData = tableMeta.Columns.FindPtr(member->GetName());
YQL_ENSURE(columnData);
auto columnDataType = columnData->TypeInfo.GetTypeId();
- // TODO: support pg types
- YQL_ENSURE(columnDataType != 0 && columnDataType != NScheme::NTypeIds::Pg);
-
+ YQL_ENSURE(columnDataType != 0);
ValidateColumnType(member->GetItemType(), columnDataType);
}
}
@@ -110,8 +116,7 @@ void ValidateRangeBoundType(const TTupleExprType* keyTupleType, const TKikimrTab
auto columnData = tableMeta.Columns.FindPtr(tableMeta.KeyColumnNames[i]);
YQL_ENSURE(columnData);
auto columnDataType = columnData->TypeInfo.GetTypeId();
- // TODO: support pg types
- YQL_ENSURE(columnDataType != 0 && columnDataType != NScheme::NTypeIds::Pg);
+ YQL_ENSURE(columnDataType != 0);
ValidateColumnType(keyTupleType->GetItems()[i]->Cast<TOptionalExprType>()->GetItemType(), columnDataType);
}
@@ -152,10 +157,9 @@ TKqpKeyRange MakeKeyRange(const TKqlReadTableBase& readTable, const TKqlCompileC
auto columnData = tableMeta.Columns.FindPtr(keyColumn);
YQL_ENSURE(columnData);
auto columnDataType = columnData->TypeInfo.GetTypeId();
- // TODO: support pg types
- YQL_ENSURE(columnDataType != 0 && columnDataType != NScheme::NTypeIds::Pg);
- auto columnType = CreateColumnType(columnDataType, ctx);
+ YQL_ENSURE(columnDataType != 0);
+ auto columnType = CreateColumnType(columnData->TypeInfo, ctx);
if (fromTuple.ArgCount() > i) {
ValidateColumnType(fromTuple.Arg(i).Ref().GetTypeAnn(), columnDataType);
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp
index 9710d154ff2..4d10e536650 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_table.cpp
@@ -23,6 +23,15 @@ TTableId ParseTableId(const TRuntimeNode& node) {
return TTableId(ownerId, tableId, sysViewInfo, schemeVersion);
}
+bool StructHoldsPgType(const TStructType& structType, ui32 index) {
+ return (structType.GetMemberType(index)->GetKind() == TType::EKind::Pg);
+}
+
+NScheme::TTypeInfo UnwrapPgTypeFromStruct(const TStructType& structType, ui32 index) {
+ TPgType* type = AS_TYPE(TPgType, structType.GetMemberType(index));
+ return NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(type->GetTypeId()));
+}
+
NUdf::TDataTypeId UnwrapDataTypeFromStruct(const TStructType& structType, ui32 index) {
if (structType.GetMemberType(index)->GetKind() == TType::EKind::Optional) {
auto type = AS_TYPE(TDataType, AS_TYPE(TOptionalType, structType.GetMemberType(index))->GetItemType());
diff --git a/ydb/core/kqp/runtime/kqp_runtime_impl.h b/ydb/core/kqp/runtime/kqp_runtime_impl.h
index d1c504ef7c5..f9256c05df5 100644
--- a/ydb/core/kqp/runtime/kqp_runtime_impl.h
+++ b/ydb/core/kqp/runtime/kqp_runtime_impl.h
@@ -16,6 +16,8 @@ struct TKqpRangePartition {
TTableId ParseTableId(const NMiniKQL::TRuntimeNode& node);
NUdf::TDataTypeId UnwrapDataTypeFromStruct(const NMiniKQL::TStructType& structType, ui32 index);
+NScheme::TTypeInfo UnwrapPgTypeFromStruct(const NMiniKQL::TStructType& structType, ui32 index);
+bool StructHoldsPgType(const NMiniKQL::TStructType& structType, ui32 index);
NYql::NDq::IDqOutputConsumer::TPtr CreateOutputRangePartitionConsumer(
TVector<NYql::NDq::IDqOutput::TPtr>&& outputs, TVector<TKqpRangePartition>&& partitions,
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 1fba5803392..77c71977628 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -2,6 +2,9 @@
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <util/system/env.h>
+
extern "C" {
#include "postgres.h"
@@ -307,16 +310,17 @@ Y_UNIT_TEST_SUITE(KqpPg) {
bool isText,
std::function<TString(size_t)> textIn,
TString setTableName = "",
- ui16 rowCount = 10
+ ui16 rowCount = 10,
+ TVector <TString> colNames = {"key", "value"}
) {
TTableBuilder builder;
if (isKey) {
- builder.AddNullableColumn("key", makePgType(id));
+ builder.AddNullableColumn(colNames[0], makePgType(id));
} else {
- builder.AddNullableColumn("key", makePgType(INT2OID));
+ builder.AddNullableColumn(colNames[0], makePgType(INT2OID));
}
- builder.AddNullableColumn("value", makePgType(id));
- builder.SetPrimaryKeyColumn("key");
+ builder.AddNullableColumn(colNames[1], makePgType(id));
+ builder.SetPrimaryKeyColumn(colNames[0]);
auto tableName = (setTableName.empty()) ?
Sprintf("/Root/Pg%u_%s", id, isText ? "t" : "b") : setTableName;
@@ -332,15 +336,15 @@ Y_UNIT_TEST_SUITE(KqpPg) {
if (isKey) {
rows.AddListItem()
.BeginStruct()
- .AddMember("key").Pg(TPgValue(mode, str, makePgType(id)))
- .AddMember("value").Pg(TPgValue(mode, str, makePgType(id)))
+ .AddMember(colNames[0]).Pg(TPgValue(mode, str, makePgType(id)))
+ .AddMember(colNames[1]).Pg(TPgValue(mode, str, makePgType(id)))
.EndStruct();
} else {
auto int2Str = NPg::PgNativeBinaryFromNativeText(Sprintf("%u", i), INT2OID).Str;
rows.AddListItem()
.BeginStruct()
- .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID)))
- .AddMember("value").Pg(TPgValue(mode, str, makePgType(id)))
+ .AddMember(colNames[0]).Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID)))
+ .AddMember(colNames[1]).Pg(TPgValue(mode, str, makePgType(id)))
.EndStruct();
}
}
@@ -350,8 +354,8 @@ Y_UNIT_TEST_SUITE(KqpPg) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
auto readSettings = TReadTableSettings()
- .AppendColumns("key")
- .AppendColumns("value");
+ .AppendColumns(colNames[0])
+ .AppendColumns(colNames[1]);
auto it = session.ReadTable(tableName, readSettings).GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), result.GetIssues().ToString());
@@ -359,7 +363,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
};
Y_UNIT_TEST(CreateTableBulkUpsertAndRead) {
- TKikimrRunner kikimr;
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto testSingleType = [&kikimr] (ui32 id, bool isKey, bool isText,
std::function<TString(size_t)> textIn,
@@ -455,7 +459,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
Y_UNIT_TEST(EmptyQuery) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
auto result = client.ExecuteYqlScript(R"(
@@ -467,7 +471,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
Y_UNIT_TEST(NoTableQuery) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
auto result = client.ExecuteYqlScript(R"(
@@ -489,7 +493,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
Y_UNIT_TEST(TableSelect) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto testSingleType = [&kikimr] (ui32 id, bool isKey,
std::function<TString(size_t)> textIn,
std::function<TString(size_t)> textOut)
@@ -548,9 +552,9 @@ Y_UNIT_TEST_SUITE(KqpPg) {
[] (auto i) { return Sprintf("bytea %u", i); },
[] (auto i) { return Sprintf("\\x627974656120%x", i + 48); });
- testSingleType(BYTEAARRAYOID, false,
- [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); },
- [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); });
+ // testSingleType(BYTEAARRAYOID, false,
+ // [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); },
+ // [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); });
};
testByteaType();
for (const auto& [oid, spec] : typeSpecs) {
@@ -567,12 +571,144 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
Y_UNIT_TEST(CreateNotNullPgColumn) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
TTableBuilder builder;
UNIT_ASSERT_EXCEPTION(builder.AddNonNullableColumn("key", makePgType(INT2OID)), yexception);
//add create table check here once create table YQL is supported
}
+
+ Y_UNIT_TEST(TableInsert) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+
+ auto testSingleType = [&kikimr] (ui32 id, bool isKey,
+ std::function<TString(size_t)> textIn,
+ std::function<TString(size_t)> textOut)
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ auto tableName = createTable(db, session, id, isKey, false, textIn, "", 0);
+ session.Close().GetValueSync();
+ NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+ auto valType = NYql::NPg::LookupType(id).Name;
+ auto keyType = (isKey) ? valType : "int2";
+ if (id == BITOID) {
+ valType.append("(4)");
+ }
+ for (size_t i = 0; i < ((id == BOOLOID) ? 2 : 10); i++) {
+ auto keyIn = (isKey) ? textIn(i) : ToString(i);
+ TString req = TStringBuilder() << R"(
+ --!syntax_pg
+ INSERT INTO ")" << tableName << "\" (key, value) VALUES ('"
+ << keyIn << "'::" << keyType << ", '" << textIn(i) << "'::" << valType << ");";
+ Cerr << req << Endl;
+ auto result = client.ExecuteYqlScript(req).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ auto result = client.ExecuteYqlScript(
+ TStringBuilder() << R"(
+ --!syntax_pg
+ SELECT * FROM ")" << tableName << "\";"
+ ).GetValueSync();
+
+ TResultSetParser parser(result.GetResultSetParser(0));
+ for (size_t i = 0; parser.TryNextRow(); ++i) {
+ auto check = [&parser, &id] (const TString& column, const TString& expected) {
+ auto& c = parser.ColumnParser(column);
+ UNIT_ASSERT_VALUES_EQUAL(expected, c.GetPg().Content_);
+ };
+ auto expected = textOut(i);
+ if (isKey) {
+ check("key", expected);
+ }
+ check("value", expected);
+ }
+ };
+
+ auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec)
+ {
+ testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut);
+ };
+
+ auto testByteaType = [&] () {
+ testSingleType(BYTEAOID, true,
+ [] (auto i) { return Sprintf("bytea %u", i); },
+ [] (auto i) { return Sprintf("\\x627974656120%x", i + 48); });
+
+ // testSingleType(BYTEAARRAYOID, false,
+ // [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); },
+ // [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); });
+ };
+ testByteaType();
+ for (auto [oid, spec] : typeSpecs) {
+ Cerr << oid << Endl;
+ if (oid == CHAROID) {
+ continue;
+ // I cant come up with a query with explicit char conversion.
+ // ::char, ::character casts to pg_bpchar
+ }
+ if (oid == MONEYOID || oid == BITOID || oid == VARBITOID) {
+ spec.IsKey = false;
+ // Those types do not have HashProcId, so are not hashable,
+ // And we can not validate their uniqueness as keys in INSERT.
+ }
+ testType(oid, spec);
+ }
+ }
+
+ Y_UNIT_TEST(InsertFromSelect) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+
+ auto testSingleType = [&kikimr] (ui32 id, bool isKey,
+ std::function<TString(size_t)> textIn,
+ std::function<TString(size_t)> textOut)
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ auto tableName = createTable(db, session, id, isKey, false, textIn, "", 10, {"key1", "value1"});
+ TString emptyTableName = "/Root/PgEmpty" + ToString(id);
+ createTable(db, session, id, isKey, false, textIn, emptyTableName, 0);
+ session.Close().GetValueSync();
+ NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+ auto result = client.ExecuteYqlScript(
+ TStringBuilder() << R"(
+ --!syntax_pg
+ INSERT INTO ")" << emptyTableName << "\" (key, value) SELECT * FROM \"" << tableName << "\";"
+ ).GetValueSync();
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::INTERNAL_ERROR);
+ // result = client.ExecuteYqlScript(
+ // TStringBuilder() << R"(
+ // --!syntax_pg
+ // SELECT * FROM ")" << emptyTableName << "\";"
+ // ).GetValueSync();
+ // UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ // bool gotRows = false;
+ // TResultSetParser parser(result.GetResultSetParser(0));
+ // for (size_t i = 0; parser.TryNextRow(); ++i) {
+ // gotRows = true;
+ // auto check = [&parser, &id] (const TString& column, const TString& expected) {
+ // auto& c = parser.ColumnParser(column);
+ // UNIT_ASSERT_VALUES_EQUAL(expected, c.GetPg().Content_);
+ // };
+ // auto expected = textOut(i);
+ // if (isKey) {
+ // check("key", expected);
+ // }
+ // check("value", expected);
+ // Cerr << expected << Endl;
+ // }
+ // Y_ENSURE(gotRows, "Empty select");
+ };
+
+ auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec)
+ {
+ testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut);
+ };
+
+ testType(INT2OID, typeSpecs[INT2OID]);
+ testType(DATEOID, typeSpecs[DATEOID]);
+ }
}
} // namespace NKqp
diff --git a/ydb/core/scheme_types/scheme_raw_type_value.h b/ydb/core/scheme_types/scheme_raw_type_value.h
index 4dea1ceda4f..04b1d1f667a 100644
--- a/ydb/core/scheme_types/scheme_raw_type_value.h
+++ b/ydb/core/scheme_types/scheme_raw_type_value.h
@@ -53,6 +53,7 @@ public:
TString ToString() const {
TStringBuilder builder;
// TODO: support pg types
+ Y_ENSURE(ValueType.GetTypeId() != NScheme::NTypeIds::Pg);
builder << "(type:" << ValueType.GetTypeId();
if (!IsEmpty()) {
builder << ", value:" << TString((const char*)Buffer, BufferSize).Quote();
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt
index c602f3fecce..4d22f11c182 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt
@@ -74,6 +74,7 @@ target_link_libraries(core-tx-datashard PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
@@ -330,6 +331,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index c6bd568c40b..aec50fc79a0 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -75,6 +75,7 @@ target_link_libraries(core-tx-datashard PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
@@ -332,6 +333,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt
index c6bd568c40b..aec50fc79a0 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux.txt
@@ -75,6 +75,7 @@ target_link_libraries(core-tx-datashard PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
@@ -332,6 +333,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
ydb-library-aclib
ydb-library-binary_json
ydb-library-dynumber
+ parser-pg_wrapper-interface
api-protos
lib-deprecated-kicli
dq-actors-compute
diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
index d72063a50e0..c0b804b8f80 100644
--- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
@@ -30,13 +30,19 @@ void ValidateLookupKeys(const TType* inputType, const THashMap<TString, NScheme:
for (ui32 i = 0; i < rowType->GetMembersCount(); ++i) {
auto name = rowType->GetMemberName(i);
- auto dataType = NKqp::UnwrapDataTypeFromStruct(*rowType, i);
auto columnType = keyColumns.FindPtr(name);
MKQL_ENSURE_S(columnType);
-
- // TODO: support pg types
- MKQL_ENSURE_S(dataType == columnType->GetTypeId(), "Key column type mismatch, column: " << name);
+ if (NKqp::StructHoldsPgType(*rowType, i)) {
+ auto pgTypeInfo = NKqp::UnwrapPgTypeFromStruct(*rowType, i);
+ MKQL_ENSURE_S(
+ NPg::PgTypeIdFromTypeDesc(pgTypeInfo.GetTypeDesc()) == NPg::PgTypeIdFromTypeDesc(columnType->GetTypeDesc()),
+ "Key column type mismatch, column: " << name
+ );
+ } else {
+ auto dataTypeId = NKqp::UnwrapDataTypeFromStruct(*rowType, i);
+ MKQL_ENSURE_S(dataTypeId == columnType->GetTypeId(), "Key column type mismatch, column: " << name);
+ }
}
}
@@ -65,17 +71,17 @@ TParseLookupTableResult ParseLookupTable(TCallable& callable) {
auto keyTypes = AS_TYPE(TStructType, AS_TYPE(TStreamType, keysNode.GetStaticType())->GetItemType());
result.KeyTypes.resize(keyTypes->GetMembersCount());
for (ui32 i = 0; i < result.KeyTypes.size(); ++i) {
- // TODO: support pg types
- if (keyTypes->GetMemberType(i)->IsOptional()) {
- auto type = AS_TYPE(TOptionalType, keyTypes->GetMemberType(i))->GetItemType();
- MKQL_ENSURE(type->GetKind() != TType::EKind::Pg, "pg types are not supported");
- auto dataType = AS_TYPE(TDataType, type);
- result.KeyTypes[i] = NScheme::TTypeInfo(dataType->GetSchemeType());
+ NKikimr::NMiniKQL::TType* type = keyTypes->GetMemberType(i);
+ if (type->GetKind() == TType::EKind::Pg) {
+ auto itemType = AS_TYPE(TPgType, type);
+ result.KeyTypes[i] = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(itemType->GetTypeId()));
} else {
- auto type = keyTypes->GetMemberType(i);
- MKQL_ENSURE(type->GetKind() != TType::EKind::Pg, "pg types are not supported");
- auto dataType = AS_TYPE(TDataType, type);
- result.KeyTypes[i] = NScheme::TTypeInfo(dataType->GetSchemeType());
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, keyTypes->GetMemberType(i))->GetItemType();
+ }
+ Y_ENSURE(type->GetKind() == TType::EKind::Data);
+ auto itemType = AS_TYPE(TDataType, type);
+ result.KeyTypes[i] = NScheme::TTypeInfo(itemType->GetSchemeType());
}
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
index 4719669ead4..b1b25c9270c 100644
--- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
#include <util/generic/cast.h>
@@ -67,11 +68,15 @@ public:
NUdf::TUnboxedValue value = Row.GetElement(rowIndex);
if (value) {
- // TODO: support pg types
- Y_VERIFY(type.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported");
- auto slot = NUdf::GetDataSlot(type.GetTypeId());
- MKQL_ENSURE(IsValidValue(slot, value),
- "Malformed value for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value);
+ if (type.GetTypeId() != NScheme::NTypeIds::Pg) {
+ auto slot = NUdf::GetDataSlot(type.GetTypeId());
+ MKQL_ENSURE(IsValidValue(slot, value),
+ "Malformed value for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value);
+ } else {
+ Y_UNUSED(
+ NYql::NCommon::PgValueToNativeBinary(value, NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()))
+ );
+ }
}
// NOTE: We have to copy values here as some values inlined in TUnboxedValue
@@ -179,8 +184,11 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF
for (ui32 i = 0; i < rowTypes.size(); ++i) {
const auto& name = rowType->GetMemberName(i);
MKQL_ENSURE_S(inputIndex.emplace(name, i).second);
- // TODO: support pg types
- rowTypes[i] = NScheme::TTypeInfo(NKqp::UnwrapDataTypeFromStruct(*rowType, i));
+ if (NKqp::StructHoldsPgType(*rowType, i)) {
+ rowTypes[i] = NKqp::UnwrapPgTypeFromStruct(*rowType, i);
+ } else {
+ rowTypes[i] = NScheme::TTypeInfo(NKqp::UnwrapDataTypeFromStruct(*rowType, i));
+ }
}
TVector<ui32> keyIndices(tableInfo->KeyColumnIds.size());
@@ -189,9 +197,16 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF
auto it = inputIndex.find(columnInfo.Name);
MKQL_ENSURE_S(it != inputIndex.end());
- auto typeId = NKqp::UnwrapDataTypeFromStruct(*rowType, it->second);
- // TODO: support pg types
- MKQL_ENSURE_S(typeId == columnInfo.Type.GetTypeId(), "row key type missmatch with table key type");
+ if (NKqp::StructHoldsPgType(*rowType, i)) {
+ auto typeInfo = NKqp::UnwrapPgTypeFromStruct(*rowType, i);
+ MKQL_ENSURE_S(
+ NPg::PgTypeIdFromTypeDesc(typeInfo.GetTypeDesc()) == NPg::PgTypeIdFromTypeDesc(columnInfo.Type.GetTypeDesc()),
+ "row key type mismatch with table key type"
+ );
+ } else {
+ auto typeId = NKqp::UnwrapDataTypeFromStruct(*rowType, it->second);
+ MKQL_ENSURE_S(typeId == columnInfo.Type.GetTypeId(), "row key type mismatch with table key type");
+ }
keyIndices[i] = it->second;
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 4a463a5e74f..d6ea07490ee 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -337,9 +337,12 @@ private:
} else if (reqColumns.empty()) {
for (auto& [name, type] : SrcColumns) {
Ydb::Type ydbType;
- // TODO: support pg types
- Y_VERIFY(type.GetTypeId() != NScheme::NTypeIds::Pg);
- ydbType.set_type_id((Ydb::Type::PrimitiveTypeId)type.GetTypeId());
+ if (type.GetTypeId() == NScheme::NTypeIds::Pg) {
+ Ydb::PgType* pgType = ydbType.mutable_pg_type();
+ pgType->set_oid(NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()));
+ } else {
+ ydbType.set_type_id((Ydb::Type::PrimitiveTypeId)type.GetTypeId());
+ }
reqColumns.emplace_back(name, std::move(ydbType));
}
}
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
index 2612c50ba64..9747c537e64 100644
--- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -745,7 +745,12 @@ TExprNode::TPtr ExpandPositionalUnionAll(const TExprNode& node, const TVector<TC
return KeepColumnOrder(res, node, ctx, *optCtx.Types);
}
-TExprNode::TPtr BuildValues(TPositionHandle pos, const TExprNode::TPtr& values, TExprContext& ctx) {
+TExprNode::TPtr BuildValues(
+ TPositionHandle pos,
+ const TExprNode::TPtr& values,
+ const TExprNode::TPtr& targetColumns,
+ TExprContext& ctx
+) {
return ctx.Builder(pos)
.Callable("OrderedMap")
.Add(0, values->ChildPtr(2))
@@ -754,9 +759,12 @@ TExprNode::TPtr BuildValues(TPositionHandle pos, const TExprNode::TPtr& values,
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
for (ui32 index = 0; index < values->Child(1)->ChildrenSize(); ++index) {
+ TStringBuf alias = targetColumns
+ ? targetColumns->Child(1)->Child(index)->Content()
+ : values->Child(1)->Child(index)->Content();
parent
.List(index)
- .Atom(0, values->Child(1)->Child(index)->Content())
+ .Atom(0, alias)
.Callable(1, "Nth")
.Arg(0, "row")
.Atom(1, ToString(index))
@@ -2986,13 +2994,15 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct
auto sort = GetSetting(setItem->Tail(), "sort");
auto extraSortColumns = GetSetting(setItem->Tail(), "final_extra_sort_columns");
auto extraSortKeys = GetSetting(setItem->Tail(), "final_extra_sort_keys");
+ auto targetColumns = GetSetting(setItem->Tail(), "target_columns");
bool oneRow = !from;
TExprNode::TPtr list;
if (values) {
YQL_ENSURE(!result);
- list = BuildValues(node->Pos(), values, ctx);
+ list = BuildValues(node->Pos(), values, targetColumns, ctx);
} else {
YQL_ENSURE(result);
+ YQL_ENSURE(!targetColumns, "target columns for projection are not supported yet");
TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), ctx);
TExprNode::TPtr projectionArg = projectionLambda->Head().HeadPtr();
TExprNode::TPtr projectionRoot = projectionLambda->TailPtr();
diff --git a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
index 98485de1b73..3a6a71ced74 100644
--- a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
@@ -42,30 +42,39 @@ void AddPrefix(TVector<TString>& columnOrder, const TString& prefix) {
IGraphTransformer::TStatus OrderForPgSetItem(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
TVector<TString> columnOrder;
- auto result = GetSetting(node->Tail(), "result");
- if (result) {
- for (auto& col : result->Tail().ChildrenList()) {
- if (col->Head().IsAtom()) {
- auto alias = TString(col->Head().Content());
- YQL_ENSURE(!alias.empty());
- columnOrder.push_back(alias);
- }
- else {
- YQL_ENSURE(col->Head().IsList());
- for (const auto& x : col->Head().Children()) {
- auto alias = TString(x->Content());
+ if (auto targetColumnsOption = GetSetting(node->Tail(), "target_columns")) {
+ TExprNode::TPtr targetColumns = targetColumnsOption->Child(1);
+ for (const auto& targetColumn : targetColumns->ChildrenList()) {
+ columnOrder.emplace_back(targetColumn->Content());
+ }
+ } else {
+ auto result = GetSetting(node->Tail(), "result");
+ if (result) {
+ for (size_t i = 0; i < result->Tail().ChildrenSize(); i++) {
+ auto col = result->Tail().Child(i);
+ if (col->Head().IsAtom()) {
+ auto alias = TString(col->Head().Content());
YQL_ENSURE(!alias.empty());
columnOrder.push_back(alias);
}
+ else {
+ YQL_ENSURE(col->Head().IsList());
+ for (const auto& x : col->Head().Children()) {
+ auto alias = TString(x->Content());
+ YQL_ENSURE(!alias.empty());
+ columnOrder.push_back(alias);
+ }
+ }
+ }
+ } else {
+ auto values = GetSetting(node->Tail(), "values");
+ YQL_ENSURE(values);
+ TExprNode::TPtr valuesList = values->Child(1);
+ for (size_t i = 0; i < valuesList->ChildrenSize(); i++) {
+ auto alias = TString(valuesList->Child(i)->Content());
+ YQL_ENSURE(!alias.empty());
+ columnOrder.push_back(alias);
}
- }
- } else {
- auto values = GetSetting(node->Tail(), "values");
- YQL_ENSURE(values);
- for (const auto& x : values->Child(1)->Children()) {
- auto alias = TString(x->Content());
- YQL_ENSURE(!alias.empty());
- columnOrder.push_back(alias);
}
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index b28c68ee38f..7a9df807532 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -2430,7 +2430,7 @@ bool ValidateGroups(TInputs& inputs, const THashSet<TString>& possibleAliases,
if (scanColumnsOnly) {
continue;
}
-
+
bool hasNestedAggregations = false;
ScanAggregations(group->Tail().TailPtr(), hasNestedAggregations);
if (!allowAggregates && hasNestedAggregations) {
@@ -2793,6 +2793,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
bool hasFinalExtraSortColumns = false;
TExprNode::TPtr groupExprs;
TExprNode::TPtr result;
+ TExprNode::TPtr targetColumns;
// pass 0 - from/values
// pass 1 - join
@@ -2879,8 +2880,10 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
if (!EnsureAtom(*names->Child(i), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
-
- outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(names->Child(i)->Content(), tupleType->GetItems()[i]));
+ TStringBuf columnName = targetColumns
+ ? targetColumns->Child(i)->Content()
+ : names->Child(i)->Content();
+ outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(columnName, tupleType->GetItems()[i]));
}
outputRowType = ctx.Expr.MakeType<TStructExprType>(outputItems);
@@ -3070,11 +3073,22 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
}
else {
if (column->Head().IsAtom()) {
- outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(column->Head().Content(), column->Tail().GetTypeAnn()));
+ TStringBuf columnName = targetColumns
+ ? targetColumns->Child(index)->Content()
+ : column->Head().Content();
+ outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn()));
} else {
// star or qualified star
+ size_t index = 0;
for (const auto& item : column->Tail().GetTypeAnn()->Cast<TStructExprType>()->GetItems()) {
- outputItems.push_back(hasExtTypes ? item : RemoveAlias(item, ctx.Expr));
+ auto itemRef = hasExtTypes ? item : RemoveAlias(item, ctx.Expr);
+ if (targetColumns) {
+ itemRef = ctx.Expr.MakeType<TItemExprType>(
+ targetColumns->Child(index++)->Content(),
+ itemRef->GetItemType()
+ );
+ }
+ outputItems.push_back(itemRef);
}
}
@@ -3951,8 +3965,38 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "malformed projection_order"));
}
}
-
hasProjectionOrder = true;
+ } else if (optionName == "target_columns") {
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (pass == 0) {
+ if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& child : option->Tail().Children()) {
+ if (!EnsureAtom(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ targetColumns = &option->Tail();
+ if (auto values = GetSetting(options, "values")) {
+ if (values->Child(1)->ChildrenSize() != targetColumns->ChildrenSize()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "values and target_options sizes do not match"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+ if (auto projectionOrder = GetSetting(options, "projection_order")) {
+ if (projectionOrder->ChildrenSize() != targetColumns->ChildrenSize()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "projection_order and target_options sizes do not match"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
} else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
TStringBuilder() << "Unsupported option: " << optionName));
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 27261aa35f0..9ace7186042 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -1791,6 +1791,34 @@ void EnsureAllNodesTypeAnnotated(const TExprNode& root) {
namespace {
+bool IsPg(
+ TPosition pos,
+ const TTypeAnnotationNode* typeAnnotation,
+ const TPgExprType*& pgType,
+ TIssue& err,
+ bool& hasErrorType)
+{
+ err = {};
+ hasErrorType = false;
+
+ if (!typeAnnotation) {
+ err = TIssue(pos, TStringBuilder() << "Expected pg, but got lambda");
+ return false;
+ }
+
+ if (typeAnnotation->GetKind() == ETypeAnnotationKind::Pg) {
+ pgType = typeAnnotation->Cast<TPgExprType>();
+ return true;
+ }
+
+ if (!HasError(typeAnnotation, err)) {
+ err = TIssue(pos, TStringBuilder() << "Expected pg, but got: " << *typeAnnotation);
+ } else {
+ hasErrorType = true;
+ }
+ return false;
+}
+
bool IsDataOrOptionalOfData(TPosition pos, const TTypeAnnotationNode* typeAnnotation, bool& isOptional,
const TDataExprType*& dataType, TIssue& err, bool& hasErrorType)
{
@@ -1847,6 +1875,12 @@ bool IsDataOrOptionalOfData(const TTypeAnnotationNode* typeAnnotation) {
return IsDataOrOptionalOfData(typeAnnotation, isOptional, dataType);
}
+bool IsPg(const TTypeAnnotationNode* typeAnnotation, const TPgExprType*& pgType) {
+ TIssue err;
+ bool hasErrorType;
+ return IsPg({}, typeAnnotation, pgType, err, hasErrorType);
+}
+
bool EnsureArgsCount(const TExprNode& node, ui32 expectedArgs, TExprContext& ctx) {
if (node.ChildrenSize() != expectedArgs) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected " << expectedArgs << " argument(s), but got " <<
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 50e15aee53c..65732456a6e 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -54,6 +54,7 @@ bool AreAllNodesTypeAnnotated(const TExprNode& root);
void EnsureAllNodesTypeAnnotated(const TExprNode& root);
bool IsDataOrOptionalOfData(const TTypeAnnotationNode* typeAnnotation, bool& isOptional, const TDataExprType*& dataType);
bool IsDataOrOptionalOfData(const TTypeAnnotationNode* typeAnnotation);
+bool IsPg(const TTypeAnnotationNode* typeAnnotation, const TPgExprType*& pgType);
bool UpdateLambdaAllArgumentsTypes(TExprNode::TPtr& lambda, const std::vector<const TTypeAnnotationNode*>& argumentsAnnotations, TExprContext& ctx);
bool UpdateLambdaArgumentsType(const TExprNode& lambda, TExprContext& ctx);
bool EnsureArgsCount(const TExprNode& node, ui32 expectedArgs, TExprContext& ctx);
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.darwin.txt b/ydb/library/yql/dq/runtime/CMakeLists.darwin.txt
index f2fbbe8be2f..799d8ce7891 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.darwin.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.darwin.txt
@@ -20,12 +20,12 @@ target_link_libraries(yql-dq-runtime PUBLIC
ydb-library-mkql_proto
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-udf
dq-actors-protos
yql-dq-common
yql-dq-expr_nodes
yql-dq-type_ann
- parser-pg_wrapper-interface
common-schema-mkql
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
index 2a90186ca3b..d3596b34849 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
@@ -21,12 +21,12 @@ target_link_libraries(yql-dq-runtime PUBLIC
ydb-library-mkql_proto
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-udf
dq-actors-protos
yql-dq-common
yql-dq-expr_nodes
yql-dq-type_ann
- parser-pg_wrapper-interface
common-schema-mkql
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux.txt
index 2a90186ca3b..d3596b34849 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.linux.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.linux.txt
@@ -21,12 +21,12 @@ target_link_libraries(yql-dq-runtime PUBLIC
ydb-library-mkql_proto
yql-minikql-comp_nodes
yql-minikql-computation
+ parser-pg_wrapper-interface
yql-public-udf
dq-actors-protos
yql-dq-common
yql-dq-expr_nodes
yql-dq-type_ann
- parser-pg_wrapper-interface
common-schema-mkql
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 383a5c0d164..2f7d1d02e36 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -12,6 +12,8 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
+
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <ydb/library/yql/public/udf/udf_value.h>
@@ -117,6 +119,14 @@ void ValidateParamValue(std::string_view paramName, const TType* type, const NUd
break;
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<const TPgType*>(type);
+ if (value) {
+ Y_UNUSED(NYql::NCommon::PgValueToNativeBinary(value, pgType->GetTypeId()));
+ }
+ break;
+ }
+
default:
YQL_ENSURE(false, "Unexpected value type in parameter"
<< ", parameter: " << paramName
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
index 6814e0e3475..c2f5d45b3a1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
@@ -781,8 +781,9 @@ public:
return ctx.HolderFactory.GetEmptyContainer();
}
+ bool UseICompare = UseIHash && std::is_same_v<TSetAccumulator, TSortedSetAccumulator>;
TSetAccumulator accumulator(KeyType, KeyTypes, IsTuple, Encoded,
- UseIHash ? MakeCompareImpl(KeyType) : nullptr,
+ UseICompare ? MakeCompareImpl(KeyType) : nullptr,
UseIHash ? MakeEquateImpl(KeyType) : nullptr,
UseIHash ? MakeHashImpl(KeyType) : nullptr,
ctx, itemsCountHint);
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 057ba99c4ed..68d20e58f1d 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -2,7 +2,6 @@
#include "mkql_computation_node_pack_impl.h"
#include "mkql_computation_node_holders.h"
#include "presort.h"
-
#include <ydb/library/yql/parser/pg_wrapper/interface/pack.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
#include <ydb/library/yql/public/decimal/yql_decimal_serialize.h>
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index 3d9e423a887..ae2ac5f2fd0 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -6,6 +6,7 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
+#include <ydb/library/yql/minikql/mkql_node_printer.h>
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <array>
@@ -1299,7 +1300,7 @@ public:
NUdf::TType* Build() const override {
return NMiniKQL::TBlockType::Create(
- const_cast<NMiniKQL::TType*>(ItemType_),
+ const_cast<NMiniKQL::TType*>(ItemType_),
(IsScalar_ ? NMiniKQL::TBlockType::EShape::Scalar : NMiniKQL::TBlockType::EShape::Many),
Parent_.Env());
}
@@ -2056,7 +2057,8 @@ NUdf::ICompare::TPtr MakeCompareImpl(const NMiniKQL::TType* type) {
case NMiniKQL::TType::EKind::Pg:
return MakePgCompare((const TPgType*)type);
default:
- throw TTypeNotSupported() << "Data, Pg, Optional, Variant over Tuple, Tuple or List is expected for comparing";
+ throw TTypeNotSupported() << "Data, Pg, Optional, Variant over Tuple, Tuple or List is expected for comparing,"
+ << "but got: " << PrintNode(type);
}
}
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 3ab2e1a86e8..e76c407e6b1 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -206,6 +206,11 @@ public:
DqEngineForce = true;
}
}
+
+ for (const auto& [cluster, provider] : Settings.ClusterMapping) {
+ Provider = provider;
+ break;
+ }
}
void OnResult(const List* raw) {
@@ -275,7 +280,7 @@ public:
using TTraverseNodeStack = TStack<std::pair<const Node*, bool>>;
[[nodiscard]]
- TAstNode* ParseSelectStmt(const SelectStmt* value, bool inner) {
+ TAstNode* ParseSelectStmt(const SelectStmt* value, bool inner, TVector <TAstNode*> targetColumns = {}) {
CTE.emplace_back();
Y_DEFER {
CTE.pop_back();
@@ -698,6 +703,9 @@ public:
}
TVector<TAstNode*> setItemOptions;
+ if (targetColumns) {
+ setItemOptions.push_back(QL(QA("target_columns"), QVL(targetColumns.data(), targetColumns.size())));
+ }
if (ListLength(x->targetList) > 0) {
setItemOptions.push_back(QL(QA("result"), QVL(res.data(), res.size())));
} else {
@@ -876,11 +884,6 @@ public:
[[nodiscard]]
TAstNode* ParseInsertStmt(const InsertStmt* value) {
- if (ListLength(value->cols) > 0) {
- AddError("InsertStmt: target columns are not supported");
- return nullptr;
- }
-
if (!value->selectStmt) {
AddError("InsertStmt: expected Select");
return nullptr;
@@ -906,14 +909,45 @@ public:
return nullptr;
}
- auto select = ParseSelectStmt(CAST_NODE(SelectStmt, value->selectStmt), true);
+ TVector <TAstNode*> targetColumns;
+ if (value->cols) {
+ for (size_t i = 0; i < ListLength(value->cols); i++) {
+ auto node = ListNodeNth(value->cols, i);
+ if (NodeTag(node) != T_ResTarget) {
+ NodeNotImplemented(value, node);
+ return nullptr;
+ }
+ auto r = CAST_NODE(ResTarget, node);
+ if (!r->name) {
+ AddError("SelectStmt: expected name");
+ return nullptr;
+ }
+ targetColumns.push_back(QA(r->name));
+ }
+ }
+
+ auto select = ParseSelectStmt(CAST_NODE(SelectStmt, value->selectStmt), true, targetColumns);
if (!select) {
return nullptr;
}
- auto writeOptions = QL(QL(QA("mode"), QA("append")));
- Statements.push_back(L(A("let"), A("world"), L(A("Write!"),
- A("world"), insertDesc.Sink, insertDesc.Key, select, writeOptions)));
+ auto insertMode = (ProviderToInsertModeMap.contains(Provider))
+ ? ProviderToInsertModeMap.at(Provider)
+ : "append";
+
+ auto writeOptions = QL(QL(QA("mode"), QA(insertMode)));
+ Statements.push_back(L(
+ A("let"),
+ A("world"),
+ L(
+ A("Write!"),
+ A("world"),
+ insertDesc.Sink,
+ insertDesc.Key,
+ select,
+ writeOptions
+ )
+ ));
return Statements.back();
}
@@ -1157,11 +1191,6 @@ public:
return {};
}
- if (StrLength(value->schemaname) == 0) {
- AddError("schemaname should be specified");
- return {};
- }
-
if (StrLength(value->relname) == 0) {
AddError("relname should be specified");
return {};
@@ -2782,6 +2811,13 @@ private:
TVector<NYql::TPosition> Positions;
TVector<ui32> RowStarts;
ui32 QuerySize;
+ TString Provider;
+ static const THashMap<TStringBuf, TString> ProviderToInsertModeMap;
+};
+
+const THashMap<TStringBuf, TString> TConverter::ProviderToInsertModeMap = {
+ {NYql::KikimrProviderName, "insert_abort"},
+ {NYql::YtProviderName, "append"}
};
NYql::TAstParseResult PGToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings) {