diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-10-13 12:57:07 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-10-13 13:53:01 +0300 |
commit | 7ee659757df24a301c15da203481cd5b60e0bf67 (patch) | |
tree | 993fdb553aac4170f96dd571d882760070e8162d | |
parent | 9d5156a9810bb20f3c0406268a8beb2701a114c3 (diff) | |
download | ydb-7ee659757df24a301c15da203481cd5b60e0bf67.tar.gz |
KIKIMR-18957: Create if not exists
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_expr_nodes.json | 3 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 59 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 4 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql.cpp | 51 |
11 files changed, 114 insertions, 44 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 6ea099cdfa..9eb5d1afc1 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -118,14 +118,13 @@ public: } auto promise = NewPromise<IKqpGateway::TGenericResult>(); - - bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().HasSuccessOnNotExist() - ? ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist() - : false; + + bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist(); + bool failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists(); IActor* requestHandler = new TSchemeOpRequestHandler( - ev.Release(), - promise, - true, + ev.Release(), + promise, + failedOnAlreadyExists, successOnNotExist ); RegisterWithSameMailbox(requestHandler); @@ -135,7 +134,7 @@ public: promise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) { auto ev = MakeHolder<TEvPrivate::TEvResult>(); ev->Result = future.GetValue(); - + actorSystem->Send(selfId, ev.Release()); }); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index c2e25e560d..ff52c4026f 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -725,9 +725,10 @@ public: return profilesPromise.GetFuture(); } - TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir) override { + TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override { Y_UNUSED(metadata); Y_UNUSED(createDir); + Y_UNUSED(existingOk); return NotImplemented<TGenericResult>(); } diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 8adbad2a83..a3876f43bd 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -418,7 +418,7 @@ public: return Gateway->LoadTableMetadata(cluster, table, settings); } - TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir) override { + TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override { CHECK_PREPARED_DDL(CreateTable); std::pair<TString, TString> pathPair; @@ -433,7 +433,7 @@ public: auto profilesFuture = Gateway->GetTableProfiles(); auto tablePromise = NewPromise<TGenericResult>(); auto temporary = metadata->Temporary; - profilesFuture.Subscribe([gateway, sessionCtx, metadata, tablePromise, pathPair, isPrepare, temporary] + profilesFuture.Subscribe([gateway, sessionCtx, metadata, tablePromise, pathPair, isPrepare, temporary, existingOk] (const TFuture<IKqpGateway::TKqpTableProfilesResult>& future) mutable { auto profilesResult = future.GetValue(); if (!profilesResult.Success()) { @@ -510,6 +510,7 @@ public: auto& phyQuery = *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); auto& phyTx = *phyQuery.AddTransactions(); phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + schemeTx.SetFailedOnAlreadyExists(!existingOk); phyTx.MutableSchemeOperation()->MutableCreateTable()->Swap(&schemeTx); TGenericResult result; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index 87f92fcca9..e4e0121f97 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -249,7 +249,7 @@ private: ? GetTableTypeFromString(settings.TableType.Cast()) : ETableType::Table; // v0 support - if (mode == "create") { + if (mode == "create" || mode == "create_if_not_exists") { if (!settings.Columns) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "No columns provided for create mode.")); @@ -691,7 +691,7 @@ public: ? settings.TableType.Cast() : Build<TCoAtom>(ctx, node->Pos()).Value("table").Done(); // v0 support auto mode = settings.Mode.Cast(); - if (mode == "create") { + if (mode == "create" || mode == "create_if_not_exists") { YQL_ENSURE(settings.Columns); YQL_ENSURE(!settings.Columns.Cast().Empty()); @@ -721,6 +721,8 @@ public: ? settings.Temporary.Cast() : Build<TCoAtom>(ctx, node->Pos()).Value("false").Done(); + auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists"); + return Build<TKiCreateTable>(ctx, node->Pos()) .World(node->Child(0)) .DataSink(node->Child(1)) @@ -738,6 +740,9 @@ public: .ColumnsDefaultValues(settings.ColumnsDefaultValues.Cast()) .TableSettings(settings.TableSettings.Cast()) .TableType(tableType) + .ExistingOk<TCoAtom>() + .Value(existringOk) + .Build() .Done() .Ptr(); } else if (mode == "alter") { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 4242f27a14..82290e12d4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -941,6 +941,7 @@ public: NThreading::TFuture<IKikimrGateway::TGenericResult> future; bool isColumn = (table.Metadata->StoreType == EStoreType::Column); + bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1"); switch (tableTypeItem) { case ETableType::ExternalTable: { future = Gateway->CreateExternalTable(cluster, @@ -959,7 +960,7 @@ public: } case ETableType::Table: case ETableType::Unknown: { - future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true) : Gateway->CreateTable(table.Metadata, true); + future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true) : Gateway->CreateTable(table.Metadata, true, existingOk); break; } } diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index 166ee1bc48..f386437d7a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -126,7 +126,8 @@ {"Index": 12, "Name": "NotNullColumns", "Type": "TCoAtomList"}, {"Index": 13, "Name": "SerialColumns", "Type": "TCoAtomList"}, {"Index": 14, "Name": "ColumnsDefaultValues", "Type": "TCoNameValueTupleList"}, - {"Index": 15, "Name": "Temporary", "Type": "TCoAtom"} + {"Index": 15, "Name": "Temporary", "Type": "TCoAtom"}, + {"Index": 16, "Name": "ExistingOk", "Type": "TCoAtom"} ] }, { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index cb1dc64be3..9fa5fe8dbd 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -760,7 +760,7 @@ public: virtual NThreading::TFuture<TTableMetadataResult> LoadTableMetadata( const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) = 0; - virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir) = 0; + virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0; virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) = 0; diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 113e275760..06a55c8844 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -925,7 +925,9 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over } auto& tableDesc = SessionCtx->Tables().GetTable(cluster, table); - if (meta->TableType == ETableType::Table && tableDesc.DoesExist() && !tableDesc.Metadata->IsSameTable(*meta)) { + + auto existingOk = (create.ExistingOk().Value() == "1"); + if (!existingOk && meta->TableType == ETableType::Table && tableDesc.DoesExist() && !tableDesc.Metadata->IsSameTable(*meta)) { ctx.AddError(TIssue(ctx.GetPosition(create.Pos()), TStringBuilder() << "Table name conflict: " << NCommon::FullTableName(cluster, table) << " is used to reference multiple tables.")); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index e7200202f8..dade31fd30 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2221,6 +2221,63 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(CreateTableIfNotExists_GenericQuery) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}) + .SetWithSampleTables(false); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE test ( + id int2 primary key + ); + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE test ( + id int4 primary key + ); + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + } + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE IF NOT EXISTS test ( + id int4 primary key + ); + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM test; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + DROP TABLE IF EXISTS test; + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM test; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot find table 'db.[/Root/test]'")); + } + } + Y_UNIT_TEST(DropTableIfExists) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); { @@ -2531,7 +2588,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { { auto result = db.ExecuteQuery(R"( INSERT INTO t VALUES (1, 'a', 'a'); - )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); UNIT_ASSERT(result.GetIssues().ToString().Contains("VALUES have 3 columns, INSERT INTO expects: 2")); } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 8d02114ab4..4e81e669c0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1166,7 +1166,7 @@ message TBlockStoreAssignOp { optional uint64 TokenVersion = 3; } -message TDropBlockStoreVolume { +message TDropBlockStoreVolume { optional uint64 FillGeneration = 1; } @@ -1494,6 +1494,8 @@ message TModifyScheme { optional NKikimrIndexBuilder.TColumnBuildSettings InitiateColumnBuild = 61; optional bool SuccessOnNotExist = 62; + + optional bool FailedOnAlreadyExists = 63 [default = true]; } // "Script", used by client to parse text files with multiple DDL commands diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 68944b468b..6d52c7be70 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -116,7 +116,7 @@ bool ValueAsString(const Value& val, TString& ret) { ret = StrFloatVal(val); return true; } - case T_String: + case T_String: case T_BitString: { ret = StrVal(val); return true; @@ -214,7 +214,7 @@ public: TVector<TString> ColNames; TAstNode* Source = nullptr; }; - + struct TPgConst { TMaybe<TString> value; enum class Type { @@ -425,7 +425,7 @@ public: std::make_move_iterator(next_row_items_to) }; } - + return listOfTuples; } @@ -460,7 +460,7 @@ public: } return true; } - + TMaybe<TVector<TPgConst::Type>> InferColumnTypesForValuesStmt(const TVector<TPgConst>& values, size_t cols) { Y_ABORT_UNLESS((values.size() % cols == 0), "wrong amount of columns for auto param values vector"); TVector<TMaybe<TPgConst::Type>> maybeColumnTypes(cols); @@ -469,7 +469,7 @@ public: const auto& value = values[i]; size_t col = i % cols; auto& columnType = maybeColumnTypes[col]; - + if (!columnType || columnType.GetRef() == TPgConst::Type::unknown || columnType.GetRef() == TPgConst::Type::nil) { columnType = value.type; continue; @@ -477,8 +477,8 @@ public: // should we allow compatible types here? if (columnType.GetRef() != value.type && columnType.GetRef() != TPgConst::Type::unknown && columnType.GetRef() != TPgConst::Type::nil) { - YQL_CLOG(INFO, Default) - << "Failed to auto parametrize: different types: " + YQL_CLOG(INFO, Default) + << "Failed to auto parametrize: different types: " << TPgConst::ToString(columnType.GetRef()) << " and " << TPgConst::ToString(value.type) << " in col " << col; return {}; @@ -495,14 +495,14 @@ public: } return columnTypes; } - + using TAutoParamName = TString; TAutoParamName AddAutoParam(Ydb::TypedValue&& val) { auto nextName = TString(AUTO_PARAM_PREFIX) + ToString(AutoParamValues.size()); AutoParamValues.emplace(nextName, std::move(val)); return nextName; } - + TAstNode* MakeValuesStmtAutoParam(TVector<TPgConst>&& values, TVector<TPgConst::Type>&& columnTypes) { TVector<Ydb::Value> ydbValues; for (auto&& pgConst : values) { @@ -582,7 +582,7 @@ public: return QL(QA("values"), QVL(valNames.data(), valNames.size()), valuesNode); } } - + TVector<TAstNode*> valueRows; valueRows.reserve(ListLength(valuesLists)); valueRows.push_back(A("AsList")); @@ -1287,11 +1287,11 @@ public: const auto select = (value->selectStmt) ? ParseSelectStmt( - CAST_NODE(SelectStmt, value->selectStmt), - true, - targetColumns, - /*allowEmptyResSet=*/false, - /*emitPgStar=*/false, + CAST_NODE(SelectStmt, value->selectStmt), + true, + targetColumns, + /*allowEmptyResSet=*/false, + /*emitPgStar=*/false, /*fillTargetColumns=*/true) : L(A("Void")); if (!select) { @@ -1455,6 +1455,7 @@ private: std::unordered_set<TString> NotNullColSet; bool isTemporary; std::vector<TAstNode*> SerialColumns; + bool ifNotExists; }; bool CheckConstraintSupported(const Constraint* pk) { @@ -1649,7 +1650,8 @@ private: TAstNode* BuildCreateTableOptions(TCreateTableCtx& ctx) { std::vector<TAstNode*> options; - options.push_back(QL(QA("mode"), QA("create"))); + TString mode = (ctx.ifNotExists) ? "create_if_not_exists" : "create"; + options.push_back(QL(QA("mode"), QA(mode))); options.push_back(QL(QA("columns"), QVL(ctx.Columns.data(), ctx.Columns.size()))); if (!ctx.PrimaryKey.empty()) { options.push_back(QL(QA("primarykey"), QVL(ctx.PrimaryKey.data(), ctx.PrimaryKey.size()))); @@ -1738,13 +1740,12 @@ public: return nullptr; } + TCreateTableCtx ctx {}; + if (value->if_not_exists) { - AddError("IF NOT EXISTS not supported"); - return nullptr; + ctx.ifNotExists = true; } - TCreateTableCtx ctx {}; - const auto relPersistence = static_cast<NPg::ERelPersistence>(value->relation->relpersistence); switch (relPersistence) { case NPg::ERelPersistence::Temp: @@ -2766,7 +2767,7 @@ public: return nullptr; } } - + TMaybe<TPgConst> GetValueNType(const A_Const* value) { TPgConst pgConst; const auto& val = value->val; @@ -2804,7 +2805,7 @@ public: } } } - + TAstNode* AutoParametrizeConst(TPgConst&& valueNType, TAstNode* pgType) { Ydb::TypedValue typedValue; @@ -2835,11 +2836,11 @@ public: return nullptr; } - TAstNode* pgTypeNode = NodeTag(val) != T_Null + TAstNode* pgTypeNode = NodeTag(val) != T_Null ? L(A("PgType"), QA(TPgConst::ToString(valueNType->type))) : L(A("PgType"), QA("unknown")); - if (Settings.AutoParametrizeEnabled && + if (Settings.AutoParametrizeEnabled && Settings.AutoParametrizeEnabledScopes.contains(settings.Scope)) { return AutoParametrizeConst(std::move(valueNType.GetRef()), pgTypeNode); } @@ -3979,7 +3980,7 @@ public: TAstNode* QVL(TAstNode* node, TPosition pos = {}) { return QVL(&node, 1, pos); } - + TAstNode* QVL(TArrayRef<TAstNode*> nodes, TPosition pos = {}) { return Q(VL(nodes, pos), pos); } |