diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-10-13 16:28:25 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-10-13 17:11:47 +0300 |
commit | 6dbb54906effccb3979c234774e1b4d2b3ce83cc (patch) | |
tree | 55e55a09036751ce189e7d1c6abaf6b3c52dcd66 | |
parent | f0121e285828f6e7a7698036f3eac325f8e1c6ac (diff) | |
download | ydb-6dbb54906effccb3979c234774e1b4d2b3ce83cc.tar.gz |
[pg] IF NOT EXISTS support for CREATE INDEX
-rw-r--r-- | ydb/core/grpc_services/rpc_alter_table.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/local_rpc/helper.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/local_rpc/helper.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/core/protos/index_builder.proto | 4 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h | 7 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql.cpp | 12 |
14 files changed, 96 insertions, 37 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index fe01512b5aa..374864e0aa6 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -142,12 +142,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab } public: - enum EFlags : ui8 { - Default = 0, - PgMode = 1 - }; - - TAlterTableRPC(IRequestOpCtx* msg, ui8 flags = Default) + TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED) : TBase(msg) , Flags(flags) {} @@ -407,9 +402,12 @@ private: const auto& req = *GetProtoRequest(); NKikimrIndexBuilder::TIndexBuildSettings settings; - if (Flags & PgMode) { + if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) { settings.set_pg_mode(true); } + if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) { + settings.set_if_not_exist(true); + } settings.set_source_path(req.path()); auto tableIndex = settings.mutable_index(); tableIndex->CopyFrom(req.add_indexes(0)); @@ -435,7 +433,9 @@ private: << ", Id# " << response.GetIndexBuild().GetId()); if (status == Ydb::StatusIds::SUCCESS) { - if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) { + if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) { + Reply(status, issuesProto, ctx); + } else if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) { CreateSSOpSubscriber(SchemeshardId, TxId, DatabaseName, TOpType::BuildIndex, std::move(Request_), ctx); Die(ctx); } else { @@ -723,7 +723,7 @@ private: TTableProfiles Profiles; EOp OpType; - const ui8 Flags; + const ui64 Flags; }; void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { @@ -735,8 +735,8 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt return new TAlterTableRPC(msg); } -IActor* CreatePgAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { - return new TAlterTableRPC(msg, TAlterTableRPC::PgMode); +IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags) { + return new TAlterTableRPC(msg, flags); } diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 9eb5d1afc11..550fbfefe28 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -104,7 +104,7 @@ public: } auto cb = GetAlterTableRespHandler(); - DoAlterTableSameMailbox(std::move(*alter.MutableReq()), std::move(cb), + DoAlterTableSameMailbox(std::move(alter), std::move(cb), Database, token, type); Become(&TKqpSchemeExecuter::ExecuteState); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index ff52c4026f7..53e173e775f 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -798,8 +798,11 @@ public: } } - TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) override { + TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, + const TMaybe<TString>& requestType, ui64 flags) override + { try { + YQL_ENSURE(!flags); //Supported only for prepared mode if (!CheckCluster(cluster)) { return InvalidCluster<TGenericResult>(cluster); } diff --git a/ydb/core/kqp/gateway/local_rpc/helper.cpp b/ydb/core/kqp/gateway/local_rpc/helper.cpp index 4681d0f08ff..86a60a85460 100644 --- a/ydb/core/kqp/gateway/local_rpc/helper.cpp +++ b/ydb/core/kqp/gateway/local_rpc/helper.cpp @@ -8,14 +8,29 @@ namespace NKikimr { namespace NGRpcService { -IActor* CreatePgAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg); +IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags); -using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest, +class TExtendedAlterTableRequest : public Ydb::Table::AlterTableRequest { +public: + TExtendedAlterTableRequest(NKqpProto::TKqpSchemeOperation::TAlterTable&& alter) + : Ydb::Table::AlterTableRequest(std::move(*alter.MutableReq())) + , ExtendedFlags(alter.GetFlags()) + {} + + ui64 GetExtendedFlags() const { + return ExtendedFlags; + } +private: + const ui64 ExtendedFlags; +}; + +using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<TExtendedAlterTableRequest, Ydb::Table::AlterTableResponse>; struct TEvPgAlterTableRequest : public TEvAlterTableRequest { - static IActor* CreateRpcActor(IRequestOpCtx* msg) { - return CreatePgAlterTableRpcActor(msg); + static IActor* CreateRpcActor(IRequestOpCtx* ctx) { + auto request = static_cast<const TExtendedAlterTableRequest*>(ctx->GetRequest()); + return CreateExtAlterTableRpcActor(ctx, request->GetExtendedFlags()); } }; @@ -42,7 +57,7 @@ IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Operations:: } } -void DoAlterTableSameMailbox(Ydb::Table::AlterTableRequest&& req, TAlterTableRespHandler&& cb, +void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb, const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type) { NRpcService::DoLocalRpcSameMailbox<NGRpcService::TEvPgAlterTableRequest>(std::move(req), std::move(cb), diff --git a/ydb/core/kqp/gateway/local_rpc/helper.h b/ydb/core/kqp/gateway/local_rpc/helper.h index faeafd874b0..769a3fed644 100644 --- a/ydb/core/kqp/gateway/local_rpc/helper.h +++ b/ydb/core/kqp/gateway/local_rpc/helper.h @@ -8,7 +8,7 @@ namespace NKqp { NYql::IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Ydb::Operations::Operation& op); using TAlterTableRespHandler = std::function<void(const Ydb::Table::AlterTableResponse& r)>; -void DoAlterTableSameMailbox(Ydb::Table::AlterTableRequest&& req, TAlterTableRespHandler&& cb, +void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb, const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type); } diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index a3876f43bd0..12ddae56e06 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -544,7 +544,7 @@ public: } TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, - const TMaybe<TString>& requestType) override + const TMaybe<TString>& requestType, ui64 flags) override { CHECK_PREPARED_DDL(AlterTable); @@ -558,13 +558,15 @@ public: alter->SetType(*requestType); } + alter->SetFlags(flags); + auto promise = NewPromise<TGenericResult>(); TGenericResult result; result.SetSuccess(); promise.SetValue(result); return promise.GetFuture(); } else { - return Gateway->AlterTable(cluster, std::move(req), requestType); + return Gateway->AlterTable(cluster, std::move(req), requestType, flags); } } diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 82290e12d45..22abf675858 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1059,6 +1059,7 @@ public: return SyncError(); } + ui64 alterTableFlags = 0; //additional flags to pass options without public api modification Ydb::Table::AlterTableRequest alterTableRequest; alterTableRequest.set_path(table.Metadata->Name); @@ -1333,6 +1334,20 @@ public: TString columnName(column.Value()); add_index->add_data_columns(columnName); } + } else if (name == "flags") { + auto flagList = columnTuple.Item(1).Cast<TCoAtomList>(); + for (auto flag : flagList) { + TString flagName(flag.Value()); + if (flagName == "pg") { + alterTableFlags |= NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE; + } else if (flagName == "ifNotExists") { + alterTableFlags |= NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS; + } else { + ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), + TStringBuilder() << "Unknown add index flag: " << flagName)); + return SyncError(); + } + } } else { ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), TStringBuilder() << "Unknown add index setting: " << name)); @@ -1524,7 +1539,7 @@ public: if (!SessionCtx->Query().DocumentApiRestricted) { requestType = NKikimr::NDocApi::RequestType; } - future = Gateway->AlterTable(cluster, std::move(alterTableRequest), requestType); + future = Gateway->AlterTable(cluster, std::move(alterTableRequest), requestType, alterTableFlags); } return WrapFuture(future, diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 9fa5fe8dbdc..25a17a4b2de 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -762,7 +762,8 @@ public: virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0; - virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) = 0; + virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, + const TMaybe<TString>& requestType, ui64 flags) = 0; virtual NThreading::TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) = 0; diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index dade31fd308..11646910acf 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1537,17 +1537,25 @@ Y_UNIT_TEST_SUITE(KqpPg) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); } -/* { const auto query = Q_(R"( --!syntax_pg CREATE INDEX IF NOT EXISTS "test_fk_idx" ON test (fk); )"); - auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } -*/ + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE INDEX IF NOT EXISTS "test" ON test (fk); + )"); + + auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } } Y_UNIT_TEST(CreateUniqPgColumn) { diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto index 7f2d7d0dcbc..303fe51067a 100644 --- a/ydb/core/protos/index_builder.proto +++ b/ydb/core/protos/index_builder.proto @@ -21,7 +21,8 @@ message TIndexBuildSettings { optional string source_path = 1; optional Ydb.Table.TableIndex index = 2; optional TColumnBuildSettings column_build_operation = 7; - optional bool pg_mode = 8; + optional bool pg_mode = 8 [ default = false]; + optional bool if_not_exist = 9 [ default = false]; optional uint32 max_batch_rows = 3 [ default = 500 ]; optional uint64 max_batch_bytes = 4 [ default = 8388608 ]; @@ -49,6 +50,7 @@ message TEvCreateResponse { optional Ydb.StatusIds.StatusCode Status = 2; repeated Ydb.Issue.IssueMessage Issues = 3; optional TIndexBuild IndexBuild = 4; + optional sint32 SchemeStatus = 5 [default = -1]; //flat_tx_scheme.proto - enum EStatus, -1 is inexistent enum value } message TEvGetRequest { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index d1810e9c022..92ce2f435dc 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -372,9 +372,15 @@ message TKqpPhyResult { } message TKqpSchemeOperation { + enum EFlags { + FLAG_UNSPECIFIED = 0; + FLAG_PG_MODE = 1; // set if pg compatible mode + FLAG_IF_NOT_EXISTS = 2; // set if IF_NOT_EXISTS modificator present + }; message TAlterTable { Ydb.Table.AlterTableRequest Req = 1; string Type = 2; + uint64 Flags = 3; } oneof Operation { NKikimrSchemeOp.TModifyScheme CreateTable = 1; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index 560446ca560..2fa54dafd6c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -45,7 +45,7 @@ public: .NotUnderDomainUpgrade(); if (!checks) { - return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); + return Reply(checks.GetStatus(), checks.GetError()); } } @@ -76,7 +76,7 @@ public: .IsTheSameDomain(domainPath); if (!checks) { - return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); + return Reply(checks.GetStatus(), checks.GetError()); } } @@ -96,7 +96,7 @@ public: checks .IsResolved() .NotUnderDeleting() - .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, false); + .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, settings.if_not_exist()); } else { checks .NotEmpty() @@ -114,7 +114,7 @@ public: .ShardsLimit(1); // impl-table if (!checks) { - return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); + return Reply(checks.GetStatus(), checks.GetError()); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h index 4e93f6a9183..18b52b122fa 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h @@ -95,6 +95,13 @@ public: Send(Request->Sender, std::move(Response), 0, Request->Cookie); return true; } + + bool Reply(const NKikimrScheme::EStatus status, const TString& errorMessage) + { + Y_ABORT_UNLESS(Response); + Response->Record.SetSchemeStatus(status); + return Reply(TranslateStatusCode(status), errorMessage); + } }; } // NSchemeShard diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 6d52c7be704..33816b261e8 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -2221,11 +2221,11 @@ public: return nullptr; } - //std::vector<TAstNode*> flags; - //flags.emplace_back(QA("pg")); - //if (value->if_not_exists) { - // flags.emplace_back(QA("ifNotExists")); - //} + std::vector<TAstNode*> flags; + flags.emplace_back(QA("pg")); + if (value->if_not_exists) { + flags.emplace_back(QA("ifNotExists")); + } std::vector<TAstNode*> desc; auto indexNameAtom = QA("indexName"); @@ -2237,7 +2237,7 @@ public: desc.emplace_back(QL(QA("indexType"), QA(value->unique ? "syncGlobalUnique" : "syncGlobal"))); desc.emplace_back(QL(QA("indexColumns"), QVL(columns->data(), columns->size()))); desc.emplace_back(QL(QA("dataColumns"), QVL(coverColumns->data(), coverColumns->size()))); - //desc.emplace_back(QL(QA("flags"), QVL(flags.data(), flags.size()))); + desc.emplace_back(QL(QA("flags"), QVL(flags.data(), flags.size()))); Statements.push_back(L( A("let"), |