aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-10-13 16:28:25 +0300
committerdcherednik <dcherednik@ydb.tech>2023-10-13 17:11:47 +0300
commit6dbb54906effccb3979c234774e1b4d2b3ce83cc (patch)
tree55e55a09036751ce189e7d1c6abaf6b3c52dcd66
parentf0121e285828f6e7a7698036f3eac325f8e1c6ac (diff)
downloadydb-6dbb54906effccb3979c234774e1b4d2b3ce83cc.tar.gz
[pg] IF NOT EXISTS support for CREATE INDEX
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp5
-rw-r--r--ydb/core/kqp/gateway/local_rpc/helper.cpp25
-rw-r--r--ydb/core/kqp/gateway/local_rpc/helper.h2
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp17
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h3
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp14
-rw-r--r--ydb/core/protos/index_builder.proto4
-rw-r--r--ydb/core/protos/kqp_physical.proto6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h7
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp12
14 files changed, 96 insertions, 37 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index fe01512b5aa..374864e0aa6 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -142,12 +142,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
}
public:
- enum EFlags : ui8 {
- Default = 0,
- PgMode = 1
- };
-
- TAlterTableRPC(IRequestOpCtx* msg, ui8 flags = Default)
+ TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED)
: TBase(msg)
, Flags(flags)
{}
@@ -407,9 +402,12 @@ private:
const auto& req = *GetProtoRequest();
NKikimrIndexBuilder::TIndexBuildSettings settings;
- if (Flags & PgMode) {
+ if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) {
settings.set_pg_mode(true);
}
+ if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) {
+ settings.set_if_not_exist(true);
+ }
settings.set_source_path(req.path());
auto tableIndex = settings.mutable_index();
tableIndex->CopyFrom(req.add_indexes(0));
@@ -435,7 +433,9 @@ private:
<< ", Id# " << response.GetIndexBuild().GetId());
if (status == Ydb::StatusIds::SUCCESS) {
- if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) {
+ if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) {
+ Reply(status, issuesProto, ctx);
+ } else if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) {
CreateSSOpSubscriber(SchemeshardId, TxId, DatabaseName, TOpType::BuildIndex, std::move(Request_), ctx);
Die(ctx);
} else {
@@ -723,7 +723,7 @@ private:
TTableProfiles Profiles;
EOp OpType;
- const ui8 Flags;
+ const ui64 Flags;
};
void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
@@ -735,8 +735,8 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
return new TAlterTableRPC(msg);
}
-IActor* CreatePgAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
- return new TAlterTableRPC(msg, TAlterTableRPC::PgMode);
+IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags) {
+ return new TAlterTableRPC(msg, flags);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 9eb5d1afc11..550fbfefe28 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -104,7 +104,7 @@ public:
}
auto cb = GetAlterTableRespHandler();
- DoAlterTableSameMailbox(std::move(*alter.MutableReq()), std::move(cb),
+ DoAlterTableSameMailbox(std::move(alter), std::move(cb),
Database, token, type);
Become(&TKqpSchemeExecuter::ExecuteState);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index ff52c4026f7..53e173e775f 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -798,8 +798,11 @@ public:
}
}
- TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) override {
+ TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
+ const TMaybe<TString>& requestType, ui64 flags) override
+ {
try {
+ YQL_ENSURE(!flags); //Supported only for prepared mode
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}
diff --git a/ydb/core/kqp/gateway/local_rpc/helper.cpp b/ydb/core/kqp/gateway/local_rpc/helper.cpp
index 4681d0f08ff..86a60a85460 100644
--- a/ydb/core/kqp/gateway/local_rpc/helper.cpp
+++ b/ydb/core/kqp/gateway/local_rpc/helper.cpp
@@ -8,14 +8,29 @@ namespace NKikimr {
namespace NGRpcService {
-IActor* CreatePgAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg);
+IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags);
-using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
+class TExtendedAlterTableRequest : public Ydb::Table::AlterTableRequest {
+public:
+ TExtendedAlterTableRequest(NKqpProto::TKqpSchemeOperation::TAlterTable&& alter)
+ : Ydb::Table::AlterTableRequest(std::move(*alter.MutableReq()))
+ , ExtendedFlags(alter.GetFlags())
+ {}
+
+ ui64 GetExtendedFlags() const {
+ return ExtendedFlags;
+ }
+private:
+ const ui64 ExtendedFlags;
+};
+
+using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<TExtendedAlterTableRequest,
Ydb::Table::AlterTableResponse>;
struct TEvPgAlterTableRequest : public TEvAlterTableRequest {
- static IActor* CreateRpcActor(IRequestOpCtx* msg) {
- return CreatePgAlterTableRpcActor(msg);
+ static IActor* CreateRpcActor(IRequestOpCtx* ctx) {
+ auto request = static_cast<const TExtendedAlterTableRequest*>(ctx->GetRequest());
+ return CreateExtAlterTableRpcActor(ctx, request->GetExtendedFlags());
}
};
@@ -42,7 +57,7 @@ IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Operations::
}
}
-void DoAlterTableSameMailbox(Ydb::Table::AlterTableRequest&& req, TAlterTableRespHandler&& cb,
+void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb,
const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type)
{
NRpcService::DoLocalRpcSameMailbox<NGRpcService::TEvPgAlterTableRequest>(std::move(req), std::move(cb),
diff --git a/ydb/core/kqp/gateway/local_rpc/helper.h b/ydb/core/kqp/gateway/local_rpc/helper.h
index faeafd874b0..769a3fed644 100644
--- a/ydb/core/kqp/gateway/local_rpc/helper.h
+++ b/ydb/core/kqp/gateway/local_rpc/helper.h
@@ -8,7 +8,7 @@ namespace NKqp {
NYql::IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Ydb::Operations::Operation& op);
using TAlterTableRespHandler = std::function<void(const Ydb::Table::AlterTableResponse& r)>;
-void DoAlterTableSameMailbox(Ydb::Table::AlterTableRequest&& req, TAlterTableRespHandler&& cb,
+void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb,
const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type);
}
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index a3876f43bd0..12ddae56e06 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -544,7 +544,7 @@ public:
}
TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
- const TMaybe<TString>& requestType) override
+ const TMaybe<TString>& requestType, ui64 flags) override
{
CHECK_PREPARED_DDL(AlterTable);
@@ -558,13 +558,15 @@ public:
alter->SetType(*requestType);
}
+ alter->SetFlags(flags);
+
auto promise = NewPromise<TGenericResult>();
TGenericResult result;
result.SetSuccess();
promise.SetValue(result);
return promise.GetFuture();
} else {
- return Gateway->AlterTable(cluster, std::move(req), requestType);
+ return Gateway->AlterTable(cluster, std::move(req), requestType, flags);
}
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 82290e12d45..22abf675858 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1059,6 +1059,7 @@ public:
return SyncError();
}
+ ui64 alterTableFlags = 0; //additional flags to pass options without public api modification
Ydb::Table::AlterTableRequest alterTableRequest;
alterTableRequest.set_path(table.Metadata->Name);
@@ -1333,6 +1334,20 @@ public:
TString columnName(column.Value());
add_index->add_data_columns(columnName);
}
+ } else if (name == "flags") {
+ auto flagList = columnTuple.Item(1).Cast<TCoAtomList>();
+ for (auto flag : flagList) {
+ TString flagName(flag.Value());
+ if (flagName == "pg") {
+ alterTableFlags |= NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE;
+ } else if (flagName == "ifNotExists") {
+ alterTableFlags |= NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS;
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()),
+ TStringBuilder() << "Unknown add index flag: " << flagName));
+ return SyncError();
+ }
+ }
} else {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()),
TStringBuilder() << "Unknown add index setting: " << name));
@@ -1524,7 +1539,7 @@ public:
if (!SessionCtx->Query().DocumentApiRestricted) {
requestType = NKikimr::NDocApi::RequestType;
}
- future = Gateway->AlterTable(cluster, std::move(alterTableRequest), requestType);
+ future = Gateway->AlterTable(cluster, std::move(alterTableRequest), requestType, alterTableFlags);
}
return WrapFuture(future,
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 9fa5fe8dbdc..25a17a4b2de 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -762,7 +762,8 @@ public:
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
- virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) = 0;
+ virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
+ const TMaybe<TString>& requestType, ui64 flags) = 0;
virtual NThreading::TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) = 0;
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index dade31fd308..11646910acf 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1537,17 +1537,25 @@ Y_UNIT_TEST_SUITE(KqpPg) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
}
-/*
{
const auto query = Q_(R"(
--!syntax_pg
CREATE INDEX IF NOT EXISTS "test_fk_idx" ON test (fk);
)");
- auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
-*/
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE INDEX IF NOT EXISTS "test" ON test (fk);
+ )");
+
+ auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
}
Y_UNIT_TEST(CreateUniqPgColumn) {
diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto
index 7f2d7d0dcbc..303fe51067a 100644
--- a/ydb/core/protos/index_builder.proto
+++ b/ydb/core/protos/index_builder.proto
@@ -21,7 +21,8 @@ message TIndexBuildSettings {
optional string source_path = 1;
optional Ydb.Table.TableIndex index = 2;
optional TColumnBuildSettings column_build_operation = 7;
- optional bool pg_mode = 8;
+ optional bool pg_mode = 8 [ default = false];
+ optional bool if_not_exist = 9 [ default = false];
optional uint32 max_batch_rows = 3 [ default = 500 ];
optional uint64 max_batch_bytes = 4 [ default = 8388608 ];
@@ -49,6 +50,7 @@ message TEvCreateResponse {
optional Ydb.StatusIds.StatusCode Status = 2;
repeated Ydb.Issue.IssueMessage Issues = 3;
optional TIndexBuild IndexBuild = 4;
+ optional sint32 SchemeStatus = 5 [default = -1]; //flat_tx_scheme.proto - enum EStatus, -1 is inexistent enum value
}
message TEvGetRequest {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index d1810e9c022..92ce2f435dc 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -372,9 +372,15 @@ message TKqpPhyResult {
}
message TKqpSchemeOperation {
+ enum EFlags {
+ FLAG_UNSPECIFIED = 0;
+ FLAG_PG_MODE = 1; // set if pg compatible mode
+ FLAG_IF_NOT_EXISTS = 2; // set if IF_NOT_EXISTS modificator present
+ };
message TAlterTable {
Ydb.Table.AlterTableRequest Req = 1;
string Type = 2;
+ uint64 Flags = 3;
}
oneof Operation {
NKikimrSchemeOp.TModifyScheme CreateTable = 1;
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
index 560446ca560..2fa54dafd6c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
@@ -45,7 +45,7 @@ public:
.NotUnderDomainUpgrade();
if (!checks) {
- return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError());
+ return Reply(checks.GetStatus(), checks.GetError());
}
}
@@ -76,7 +76,7 @@ public:
.IsTheSameDomain(domainPath);
if (!checks) {
- return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError());
+ return Reply(checks.GetStatus(), checks.GetError());
}
}
@@ -96,7 +96,7 @@ public:
checks
.IsResolved()
.NotUnderDeleting()
- .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, false);
+ .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, settings.if_not_exist());
} else {
checks
.NotEmpty()
@@ -114,7 +114,7 @@ public:
.ShardsLimit(1); // impl-table
if (!checks) {
- return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError());
+ return Reply(checks.GetStatus(), checks.GetError());
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
index 4e93f6a9183..18b52b122fa 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
@@ -95,6 +95,13 @@ public:
Send(Request->Sender, std::move(Response), 0, Request->Cookie);
return true;
}
+
+ bool Reply(const NKikimrScheme::EStatus status, const TString& errorMessage)
+ {
+ Y_ABORT_UNLESS(Response);
+ Response->Record.SetSchemeStatus(status);
+ return Reply(TranslateStatusCode(status), errorMessage);
+ }
};
} // NSchemeShard
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 6d52c7be704..33816b261e8 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -2221,11 +2221,11 @@ public:
return nullptr;
}
- //std::vector<TAstNode*> flags;
- //flags.emplace_back(QA("pg"));
- //if (value->if_not_exists) {
- // flags.emplace_back(QA("ifNotExists"));
- //}
+ std::vector<TAstNode*> flags;
+ flags.emplace_back(QA("pg"));
+ if (value->if_not_exists) {
+ flags.emplace_back(QA("ifNotExists"));
+ }
std::vector<TAstNode*> desc;
auto indexNameAtom = QA("indexName");
@@ -2237,7 +2237,7 @@ public:
desc.emplace_back(QL(QA("indexType"), QA(value->unique ? "syncGlobalUnique" : "syncGlobal")));
desc.emplace_back(QL(QA("indexColumns"), QVL(columns->data(), columns->size())));
desc.emplace_back(QL(QA("dataColumns"), QVL(coverColumns->data(), coverColumns->size())));
- //desc.emplace_back(QL(QA("flags"), QVL(flags.data(), flags.size())));
+ desc.emplace_back(QL(QA("flags"), QVL(flags.data(), flags.size())));
Statements.push_back(L(
A("let"),