diff options
author | mshulb <mshulb@yandex-team.com> | 2023-11-02 12:44:17 +0300 |
---|---|---|
committer | mshulb <mshulb@yandex-team.com> | 2023-11-02 13:08:01 +0300 |
commit | 2ed98c75a8679c51dfa9a86494a75a0ce2c92751 (patch) | |
tree | c92e630cace4ae36d67896fb9bf0e42bd36abc44 | |
parent | aa14f36616cf44694a09a0644717c76f91d8f917 (diff) | |
download | ydb-2ed98c75a8679c51dfa9a86494a75a0ce2c92751.tar.gz |
KIKIMR-19695 Add initial support for DROP INDEX: parsing and preparing for proxy.
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_expr_nodes.json | 12 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider_impl.h | 17 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 42 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.cpp | 23 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.h | 12 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql.cpp | 107 |
12 files changed, 282 insertions, 22 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index ca56c8aac0..0540e7a542 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -575,7 +575,7 @@ public: errResult.AddIssue(NYql::TIssue(error)); errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); tablePromise.SetValue(errResult); - return tablePromise.GetFuture(); + return tablePromise.GetFuture(); } TGenericResult result; result.SetSuccess(); @@ -631,7 +631,7 @@ public: CHECK_PREPARED_DDL(AlterTable); auto tablePromise = NewPromise<TGenericResult>(); - + if (!IsPrepare()) { SessionCtx->Query().PrepareOnly = false; if (SessionCtx->Query().PreparingQuery) { @@ -711,7 +711,7 @@ public: auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); auto& phyTx = *phyQuery.AddTransactions(); phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); - + phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx); phyTx.MutableSchemeOperation()->MutableDropTable()->SetSuccessOnNotExist(settings.SuccessOnNotExist); diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index ccea19fd61..52e7652c3e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -163,6 +163,12 @@ private: return TStatus::Error; } + TStatus HandlePgDropObject(TPgDropObject node, TExprContext& ctx) override { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() + << "PgDropObject is not yet implemented for intent determination transformer")); + return TStatus::Error; + } + static void HandleDropTable(TIntrusivePtr<TKikimrSessionContext>& ctx, const NCommon::TWriteTableSettings& settings, const TKikimrKey& key, const TStringBuf& cluster) { @@ -288,6 +294,8 @@ private: return TStatus::Ok; case TKikimrKey::Type::Permission: return TStatus::Ok; + case TKikimrKey::Type::PGObject: + return TStatus::Ok; } ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid table key type.")); @@ -471,6 +479,10 @@ public: return true; } + if(node.IsCallable(TPgDropObject::CallableName())) { + return true; + } + if (auto maybeRight = TMaybeNode<TCoNth>(&node).Tuple().Maybe<TCoRight>()) { if (maybeRight.Input().Maybe<TKiExecDataQuery>()) { return true; @@ -507,6 +519,23 @@ public: .Ptr(); } + static TExprNode::TPtr MakePgDropObject(const TExprNode::TPtr& node, const NCommon::TPgObjectSettings& settings, + const TKikimrKey& key, TExprContext& ctx) + { + bool missingOk = (settings.IfExists.Cast().Value() == "true"); + + return Build<TPgDropObject>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .ObjectId().Build(key.GetPGObjectId()) + .TypeId().Build(key.GetPGObjectType()) + .MissingOk<TCoAtom>() + .Value(missingOk) + .Build() + .Done() + .Ptr(); + } + bool RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx, TExprNode::TPtr& resultNode) { TKiDataSink dataSink(node->ChildPtr(1)); auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath()); @@ -942,6 +971,19 @@ public: } break; } + + case TKikimrKey::Type::PGObject: { + NCommon::TPgObjectSettings settings = NCommon::ParsePgObjectSettings(TExprList(node->Child(4)), ctx); + + YQL_ENSURE(settings.Mode); + auto mode = settings.Mode.Cast(); + + if (mode == "dropIndex") { + return MakePgDropObject(node, settings, key, ctx); + } else { + YQL_ENSURE(false, "unknown PGObject mode \"" << TString(mode) << "\""); + } + } } ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Failed to rewrite IO.")); @@ -1076,6 +1118,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleDropGroup(node.Cast(), ctx); } + if(auto node = TMaybeNode<TPgDropObject>(input)) { + return HandlePgDropObject(node.Cast(), ctx); + } + if (input->IsCallable(WriteName)) { return HandleWrite(TExprBase(input), ctx); } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 8707c494cb..4c5af6a457 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -133,6 +133,8 @@ private: return TStatus::Ok; case TKikimrKey::Type::Permission: return TStatus::Ok; + case TKikimrKey::Type::PGObject: + return TStatus::Ok; } return TStatus::Error; diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 43f6c30d86..e8193f7c53 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1834,6 +1834,29 @@ public: }, "Executing DROP GROUP"); } + if (auto maybePgDropObject = TMaybeNode<TPgDropObject>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + auto pgDrop = maybePgDropObject.Cast(); + + auto cluster = TString(pgDrop.DataSink().Cluster()); + const auto type = TString(pgDrop.TypeId().Value()); + const auto objectName = TString(pgDrop.ObjectId().Value()); + + if (type == "pgIndex") { + // TODO: KIKIMR-19695 + ctx.AddError(TIssue(ctx.GetPosition(pgDrop.Pos()), + TStringBuilder() << "DROP INDEX for Postgres indexes is not implemented yet")); + return SyncError(); + } else { + ctx.AddError(TIssue(ctx.GetPosition(pgDrop.TypeId().Pos()), + TStringBuilder() << "Unknown PgDrop operation: " << type)); + return SyncError(); + } + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Failed to execute node: " << input->Content())); return SyncError(); diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index f386437d7a..799161c26e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -381,6 +381,18 @@ {"Index": 4, "Name": "Pathes", "Type": "TCoAtomList"}, {"Index": 5, "Name": "Roles", "Type": "TCoAtomList"} ] + }, + { + "Name": "TPgDropObject", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "PgDropObject!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "ObjectId", "Type": "TCoAtom"}, + {"Index": 3, "Name": "TypeId", "Type": "TCoAtom"}, + {"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"} + ] } ] } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index ffaa3f4594..b7928581df 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -62,6 +62,7 @@ struct TKikimrData { DataSinkNames.insert(TKiDataQueryBlocks::CallableName()); DataSinkNames.insert(TKiExecDataQuery::CallableName()); DataSinkNames.insert(TKiEffects::CallableName()); + DataSinkNames.insert(TPgDropObject::CallableName()); CommitModes.insert(CommitModeFlush); CommitModes.insert(CommitModeRollback); @@ -397,6 +398,10 @@ bool TKikimrKey::Extract(const TExprNode& key) { } else if(tagName == "permission") { KeyType = Type::Permission; Target = key.Child(0)->Child(1)->Child(0)->Content(); + } else if (tagName == "pgObject") { + KeyType = Type::PGObject; + Target = key.Child(0)->Child(1)->Child(0)->Content(); + ObjectType = key.Child(0)->Child(2)->Child(0)->Content(); } else { Ctx.AddError(TIssue(Ctx.GetPosition(key.Child(0)->Pos()), TString("Unexpected tag for kikimr key: ") + tagName)); return false; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 236952ae9c..25281b7a68 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -60,6 +60,7 @@ private: virtual TStatus HandleDataQueryBlocks(NNodes::TKiDataQueryBlocks node, TExprContext& ctx) = 0; virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) = 0; virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0; + virtual TStatus HandlePgDropObject(NNodes::TPgDropObject node, TExprContext& ctx) = 0; virtual TStatus HandleModifyPermissions(NNodes::TKiModifyPermissions node, TExprContext& ctx) = 0; }; @@ -73,7 +74,8 @@ public: Role, Object, Topic, - Permission + Permission, + PGObject }; struct TViewDescription { @@ -137,6 +139,19 @@ public: return Target; } + const TString& GetPGObjectId() const { + Y_DEBUG_ABORT_UNLESS(KeyType.Defined()); + Y_DEBUG_ABORT_UNLESS(KeyType == Type::PGObject); + return Target; + } + + const TString& GetPGObjectType() const { + Y_DEBUG_ABORT_UNLESS(KeyType.Defined()); + Y_DEBUG_ABORT_UNLESS(ObjectType.Defined()); + Y_DEBUG_ABORT_UNLESS(KeyType == Type::PGObject); + return *ObjectType; + } + bool Extract(const TExprNode& key); private: diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 06a55c8844..d87557a0b0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -215,6 +215,10 @@ private: { return TStatus::Ok; } + case TKikimrKey::Type::PGObject: + { + return TStatus::Ok; + } } return TStatus::Error; @@ -1568,6 +1572,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over return TStatus::Ok; } + virtual TStatus HandlePgDropObject(TPgDropObject node, TExprContext& /*ctx*/) override { + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + virtual TStatus HandleWrite(TExprBase node, TExprContext& ctx) override { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Failed to annotate Write!, IO rewrite should handle this")); return TStatus::Error; diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index beb3b91e17..593794c174 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1611,6 +1611,48 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(DropIndex) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);; + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + + auto client = kikimr.GetQueryClient(); + auto session = client.GetSession().GetValueSync().GetSession(); + const auto txCtrl = NYdb::NQuery::TTxControl::NoTx(); + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE test2( + id int8, + fk int8, + value char, + primary key(id) + ); + CREATE INDEX "test2_fk_idx" ON test2 (fk); + CREATE INDEX "test2_fk.idx_cover" ON test2 (fk) INCLUDE(value); + )"); + + auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const auto query = Q_(R"( + --!syntax_pg + DROP INDEX "test2_fk_idx"; + )"); + + auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync(); + // TODO: KIKIMR-19695 + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + // TODO: test with <schema>.<name>: "DROP INDEX test2_fk.idx_cover;" + } + Y_UNIT_TEST(CreateUniqPgColumn) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); auto client = kikimr.GetTableClient(); diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 669aec0f42..42775a6e49 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -615,6 +615,29 @@ TCommitSettings ParseCommitSettings(NNodes::TCoCommit node, TExprContext& ctx) { return ret; } +TPgObjectSettings ParsePgObjectSettings(NNodes::TExprList node, TExprContext&) { + TMaybeNode<TCoAtom> mode; + TMaybeNode<TCoAtom> ifExists; + for (auto child : node) { + if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) { + auto tuple = maybeTuple.Cast(); + auto name = tuple.Name().Value(); + + if (name == "mode") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + mode = tuple.Value().Cast<TCoAtom>(); + } else if (name == "ifExists") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + ifExists = tuple.Value().Cast<TCoAtom>(); + } + } + } + + TPgObjectSettings ret(std::move(mode), std::move(ifExists)); + return ret; +} + + TVector<TString> GetStructFields(const TTypeAnnotationNode* type) { TVector<TString> fields; if (type->GetKind() == ETypeAnnotationKind::List) { diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 8bc6777edb..14d66073a1 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -120,6 +120,16 @@ struct TCommitSettings bool EnsureOtherEmpty(TExprContext& ctx); }; +struct TPgObjectSettings +{ + NNodes::TMaybeNode<NNodes::TCoAtom> Mode; + NNodes::TMaybeNode<NNodes::TCoAtom> IfExists; + + TPgObjectSettings(NNodes::TMaybeNode<NNodes::TCoAtom>&& mode, NNodes::TMaybeNode<NNodes::TCoAtom>&& ifExists) + : Mode(std::move(mode)) + , IfExists(std::move(ifExists)) {} +}; + const TStructExprType* BuildCommonTableListType(TExprContext& ctx); TExprNode::TPtr BuildTypeExpr(TPositionHandle pos, const TTypeAnnotationNode& ann, TExprContext& ctx); @@ -138,6 +148,8 @@ TWritePermissionSettings ParseWritePermissionsSettings(NNodes::TExprList node, T TCommitSettings ParseCommitSettings(NNodes::TCoCommit node, TExprContext& ctx); +TPgObjectSettings ParsePgObjectSettings(NNodes::TExprList node, TExprContext& ctx); + TString FullTableName(const TStringBuf& cluster, const TStringBuf& table); IDataProvider::TFillSettings GetFillSettings(const TExprNode& node); diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 57d1949499..19dd471cef 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -166,6 +166,23 @@ const IndexElem* IndexElement(const Node* node) { #define AT_LOCATION_EX(node, field) \ TLocationGuard guard(this, node->field); +std::tuple<TStringBuf, TStringBuf> getSchemaAndObjectName(const List* nameList) { + switch (ListLength(nameList)) { + case 2: { + const auto clusterName = StrVal(ListNodeNth(nameList, 0)); + const auto tableName = StrVal(ListNodeNth(nameList, 1)); + return {clusterName, tableName}; + } + case 1: { + const auto tableName = StrVal(ListNodeNth(nameList, 0)); + return {"", tableName}; + } + default: { + return {"", ""}; + } + } +} + class TConverter : public IPGParseEvents { friend class TLocationGuard; @@ -1815,6 +1832,9 @@ public: case OBJECT_TABLE: { return ParseDropTableStmt(value, nameListNodes); } + case OBJECT_INDEX: { + return ParseDropIndexStmt(value, nameListNodes); + } default: { AddError("Not supported object type for DROP"); return nullptr; @@ -1858,24 +1878,7 @@ public: } for (const auto& nameList : names) { - const auto getSchemaAndTableName = [] (const List* nameList) -> std::tuple<TStringBuf, TStringBuf> { - switch (ListLength(nameList)) { - case 2: { - const auto clusterName = StrVal(ListNodeNth(nameList, 0)); - const auto tableName = StrVal(ListNodeNth(nameList, 1)); - return {clusterName, tableName}; - } - case 1: { - const auto tableName = StrVal(ListNodeNth(nameList, 0)); - return {"", tableName}; - } - default: { - return {"", ""}; - } - } - }; - - const auto [clusterName, tableName] = getSchemaAndTableName(nameList); + const auto [clusterName, tableName] = getSchemaAndObjectName(nameList); const auto [sink, key] = ParseQualifiedRelationName( /* catalogName */ "", clusterName, @@ -1904,6 +1907,47 @@ public: return Statements.back(); } + TAstNode* ParseDropIndexStmt(const DropStmt* value, const TVector<const List*>& names) { + if (value->behavior == DROP_CASCADE) { + AddError("CASCADE is not implemented"); + return nullptr; + } + + if (names.size() != 1) { + AddError("DROP INDEX requires exactly one index"); + return nullptr; + } + + for (const auto& nameList : names) { + const auto [clusterName, indexName] = getSchemaAndObjectName(nameList); + const auto [sink, key] = ParseQualifiedPgObjectName( + /* catalogName */ "", + clusterName, + indexName, + "pgIndex" + ); + + TString missingOk = (value->missing_ok) ? "true" : "false"; + Statements.push_back(L( + A("let"), + A("world"), + L( + A("Write!"), + A("world"), + sink, + key, + L(A("Void")), + QL( + QL(QA("mode"), QA("dropIndex")), + QL(QA("ifExists"), QA(missingOk)) + ) + ) + )); + } + + return Statements.back(); + } + [[nodiscard]] TAstNode* ParseVariableSetStmt(const VariableSetStmt* value) { if (value->kind != VAR_SET_VALUE) { @@ -2338,6 +2382,33 @@ public: return {sinkOrSource, key}; } + + TAstNode* BuildPgObjectExpression(const TStringBuf objectName, const TStringBuf objectType) { + return L(A("Key"), QL(QA("pgObject"), + L(A("String"), QA(objectName)), + L(A("String"), QA(objectType)) + )); + } + + TReadWriteKeyExprs ParseQualifiedPgObjectName(const TStringBuf catalogname, + const TStringBuf schemaname, + const TStringBuf objectName, + const TStringBuf pgObjectType) { + if (!catalogname.Empty()) { + AddError("catalogname is not supported"); + return {}; + } + if (objectName.Empty()) { + AddError("objectName should be specified"); + return {}; + } + + const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster; + const auto sinkOrSource = BuildClusterSinkOrSourceExpression(true, cluster); + const auto key = BuildPgObjectExpression(objectName, pgObjectType); + return {sinkOrSource, key}; + } + TReadWriteKeyExprs ParseWriteRangeVar(const RangeVar *value, bool isScheme = false) { if (value->alias) { |