aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-10-13 12:57:07 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-10-13 13:53:01 +0300
commit7ee659757df24a301c15da203481cd5b60e0bf67 (patch)
tree993fdb553aac4170f96dd571d882760070e8162d
parent9d5156a9810bb20f3c0406268a8beb2701a114c3 (diff)
downloadydb-7ee659757df24a301c15da203481cd5b60e0bf67.tar.gz
KIKIMR-18957: Create if not exists
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp15
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp3
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp9
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp3
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json3
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp4
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp59
-rw-r--r--ydb/core/protos/flat_scheme_op.proto4
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp51
11 files changed, 114 insertions, 44 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 6ea099cdfa..9eb5d1afc1 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -118,14 +118,13 @@ public:
}
auto promise = NewPromise<IKqpGateway::TGenericResult>();
-
- bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().HasSuccessOnNotExist()
- ? ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist()
- : false;
+
+ bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist();
+ bool failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists();
IActor* requestHandler = new TSchemeOpRequestHandler(
- ev.Release(),
- promise,
- true,
+ ev.Release(),
+ promise,
+ failedOnAlreadyExists,
successOnNotExist
);
RegisterWithSameMailbox(requestHandler);
@@ -135,7 +134,7 @@ public:
promise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
auto ev = MakeHolder<TEvPrivate::TEvResult>();
ev->Result = future.GetValue();
-
+
actorSystem->Send(selfId, ev.Release());
});
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index c2e25e560d..ff52c4026f 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -725,9 +725,10 @@ public:
return profilesPromise.GetFuture();
}
- TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir) override {
+ TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
Y_UNUSED(metadata);
Y_UNUSED(createDir);
+ Y_UNUSED(existingOk);
return NotImplemented<TGenericResult>();
}
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 8adbad2a83..a3876f43bd 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -418,7 +418,7 @@ public:
return Gateway->LoadTableMetadata(cluster, table, settings);
}
- TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir) override {
+ TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
CHECK_PREPARED_DDL(CreateTable);
std::pair<TString, TString> pathPair;
@@ -433,7 +433,7 @@ public:
auto profilesFuture = Gateway->GetTableProfiles();
auto tablePromise = NewPromise<TGenericResult>();
auto temporary = metadata->Temporary;
- profilesFuture.Subscribe([gateway, sessionCtx, metadata, tablePromise, pathPair, isPrepare, temporary]
+ profilesFuture.Subscribe([gateway, sessionCtx, metadata, tablePromise, pathPair, isPrepare, temporary, existingOk]
(const TFuture<IKqpGateway::TKqpTableProfilesResult>& future) mutable {
auto profilesResult = future.GetValue();
if (!profilesResult.Success()) {
@@ -510,6 +510,7 @@ public:
auto& phyQuery = *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ schemeTx.SetFailedOnAlreadyExists(!existingOk);
phyTx.MutableSchemeOperation()->MutableCreateTable()->Swap(&schemeTx);
TGenericResult result;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index 87f92fcca9..e4e0121f97 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -249,7 +249,7 @@ private:
? GetTableTypeFromString(settings.TableType.Cast())
: ETableType::Table; // v0 support
- if (mode == "create") {
+ if (mode == "create" || mode == "create_if_not_exists") {
if (!settings.Columns) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "No columns provided for create mode."));
@@ -691,7 +691,7 @@ public:
? settings.TableType.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("table").Done(); // v0 support
auto mode = settings.Mode.Cast();
- if (mode == "create") {
+ if (mode == "create" || mode == "create_if_not_exists") {
YQL_ENSURE(settings.Columns);
YQL_ENSURE(!settings.Columns.Cast().Empty());
@@ -721,6 +721,8 @@ public:
? settings.Temporary.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();
+ auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");
+
return Build<TKiCreateTable>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
@@ -738,6 +740,9 @@ public:
.ColumnsDefaultValues(settings.ColumnsDefaultValues.Cast())
.TableSettings(settings.TableSettings.Cast())
.TableType(tableType)
+ .ExistingOk<TCoAtom>()
+ .Value(existringOk)
+ .Build()
.Done()
.Ptr();
} else if (mode == "alter") {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 4242f27a14..82290e12d4 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -941,6 +941,7 @@ public:
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
bool isColumn = (table.Metadata->StoreType == EStoreType::Column);
+ bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1");
switch (tableTypeItem) {
case ETableType::ExternalTable: {
future = Gateway->CreateExternalTable(cluster,
@@ -959,7 +960,7 @@ public:
}
case ETableType::Table:
case ETableType::Unknown: {
- future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true) : Gateway->CreateTable(table.Metadata, true);
+ future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true) : Gateway->CreateTable(table.Metadata, true, existingOk);
break;
}
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index 166ee1bc48..f386437d7a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -126,7 +126,8 @@
{"Index": 12, "Name": "NotNullColumns", "Type": "TCoAtomList"},
{"Index": 13, "Name": "SerialColumns", "Type": "TCoAtomList"},
{"Index": 14, "Name": "ColumnsDefaultValues", "Type": "TCoNameValueTupleList"},
- {"Index": 15, "Name": "Temporary", "Type": "TCoAtom"}
+ {"Index": 15, "Name": "Temporary", "Type": "TCoAtom"},
+ {"Index": 16, "Name": "ExistingOk", "Type": "TCoAtom"}
]
},
{
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index cb1dc64be3..9fa5fe8dbd 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -760,7 +760,7 @@ public:
virtual NThreading::TFuture<TTableMetadataResult> LoadTableMetadata(
const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) = 0;
- virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir) = 0;
+ virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType) = 0;
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 113e275760..06a55c8844 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -925,7 +925,9 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
}
auto& tableDesc = SessionCtx->Tables().GetTable(cluster, table);
- if (meta->TableType == ETableType::Table && tableDesc.DoesExist() && !tableDesc.Metadata->IsSameTable(*meta)) {
+
+ auto existingOk = (create.ExistingOk().Value() == "1");
+ if (!existingOk && meta->TableType == ETableType::Table && tableDesc.DoesExist() && !tableDesc.Metadata->IsSameTable(*meta)) {
ctx.AddError(TIssue(ctx.GetPosition(create.Pos()), TStringBuilder()
<< "Table name conflict: " << NCommon::FullTableName(cluster, table)
<< " is used to reference multiple tables."));
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index e7200202f8..dade31fd30 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -2221,6 +2221,63 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}
+ Y_UNIT_TEST(CreateTableIfNotExists_GenericQuery) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting})
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE test (
+ id int2 primary key
+ );
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE test (
+ id int4 primary key
+ );
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT(!result.IsSuccess());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE IF NOT EXISTS test (
+ id int4 primary key
+ );
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ DROP TABLE IF EXISTS test;
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot find table 'db.[/Root/test]'"));
+ }
+ }
+
Y_UNIT_TEST(DropTableIfExists) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
{
@@ -2531,7 +2588,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
{
auto result = db.ExecuteQuery(R"(
INSERT INTO t VALUES (1, 'a', 'a');
- )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT(result.GetIssues().ToString().Contains("VALUES have 3 columns, INSERT INTO expects: 2"));
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 8d02114ab4..4e81e669c0 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1166,7 +1166,7 @@ message TBlockStoreAssignOp {
optional uint64 TokenVersion = 3;
}
-message TDropBlockStoreVolume {
+message TDropBlockStoreVolume {
optional uint64 FillGeneration = 1;
}
@@ -1494,6 +1494,8 @@ message TModifyScheme {
optional NKikimrIndexBuilder.TColumnBuildSettings InitiateColumnBuild = 61;
optional bool SuccessOnNotExist = 62;
+
+ optional bool FailedOnAlreadyExists = 63 [default = true];
}
// "Script", used by client to parse text files with multiple DDL commands
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 68944b468b..6d52c7be70 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -116,7 +116,7 @@ bool ValueAsString(const Value& val, TString& ret) {
ret = StrFloatVal(val);
return true;
}
- case T_String:
+ case T_String:
case T_BitString: {
ret = StrVal(val);
return true;
@@ -214,7 +214,7 @@ public:
TVector<TString> ColNames;
TAstNode* Source = nullptr;
};
-
+
struct TPgConst {
TMaybe<TString> value;
enum class Type {
@@ -425,7 +425,7 @@ public:
std::make_move_iterator(next_row_items_to)
};
}
-
+
return listOfTuples;
}
@@ -460,7 +460,7 @@ public:
}
return true;
}
-
+
TMaybe<TVector<TPgConst::Type>> InferColumnTypesForValuesStmt(const TVector<TPgConst>& values, size_t cols) {
Y_ABORT_UNLESS((values.size() % cols == 0), "wrong amount of columns for auto param values vector");
TVector<TMaybe<TPgConst::Type>> maybeColumnTypes(cols);
@@ -469,7 +469,7 @@ public:
const auto& value = values[i];
size_t col = i % cols;
auto& columnType = maybeColumnTypes[col];
-
+
if (!columnType || columnType.GetRef() == TPgConst::Type::unknown || columnType.GetRef() == TPgConst::Type::nil) {
columnType = value.type;
continue;
@@ -477,8 +477,8 @@ public:
// should we allow compatible types here?
if (columnType.GetRef() != value.type && columnType.GetRef() != TPgConst::Type::unknown && columnType.GetRef() != TPgConst::Type::nil) {
- YQL_CLOG(INFO, Default)
- << "Failed to auto parametrize: different types: "
+ YQL_CLOG(INFO, Default)
+ << "Failed to auto parametrize: different types: "
<< TPgConst::ToString(columnType.GetRef()) << " and " << TPgConst::ToString(value.type)
<< " in col " << col;
return {};
@@ -495,14 +495,14 @@ public:
}
return columnTypes;
}
-
+
using TAutoParamName = TString;
TAutoParamName AddAutoParam(Ydb::TypedValue&& val) {
auto nextName = TString(AUTO_PARAM_PREFIX) + ToString(AutoParamValues.size());
AutoParamValues.emplace(nextName, std::move(val));
return nextName;
}
-
+
TAstNode* MakeValuesStmtAutoParam(TVector<TPgConst>&& values, TVector<TPgConst::Type>&& columnTypes) {
TVector<Ydb::Value> ydbValues;
for (auto&& pgConst : values) {
@@ -582,7 +582,7 @@ public:
return QL(QA("values"), QVL(valNames.data(), valNames.size()), valuesNode);
}
}
-
+
TVector<TAstNode*> valueRows;
valueRows.reserve(ListLength(valuesLists));
valueRows.push_back(A("AsList"));
@@ -1287,11 +1287,11 @@ public:
const auto select = (value->selectStmt)
? ParseSelectStmt(
- CAST_NODE(SelectStmt, value->selectStmt),
- true,
- targetColumns,
- /*allowEmptyResSet=*/false,
- /*emitPgStar=*/false,
+ CAST_NODE(SelectStmt, value->selectStmt),
+ true,
+ targetColumns,
+ /*allowEmptyResSet=*/false,
+ /*emitPgStar=*/false,
/*fillTargetColumns=*/true)
: L(A("Void"));
if (!select) {
@@ -1455,6 +1455,7 @@ private:
std::unordered_set<TString> NotNullColSet;
bool isTemporary;
std::vector<TAstNode*> SerialColumns;
+ bool ifNotExists;
};
bool CheckConstraintSupported(const Constraint* pk) {
@@ -1649,7 +1650,8 @@ private:
TAstNode* BuildCreateTableOptions(TCreateTableCtx& ctx) {
std::vector<TAstNode*> options;
- options.push_back(QL(QA("mode"), QA("create")));
+ TString mode = (ctx.ifNotExists) ? "create_if_not_exists" : "create";
+ options.push_back(QL(QA("mode"), QA(mode)));
options.push_back(QL(QA("columns"), QVL(ctx.Columns.data(), ctx.Columns.size())));
if (!ctx.PrimaryKey.empty()) {
options.push_back(QL(QA("primarykey"), QVL(ctx.PrimaryKey.data(), ctx.PrimaryKey.size())));
@@ -1738,13 +1740,12 @@ public:
return nullptr;
}
+ TCreateTableCtx ctx {};
+
if (value->if_not_exists) {
- AddError("IF NOT EXISTS not supported");
- return nullptr;
+ ctx.ifNotExists = true;
}
- TCreateTableCtx ctx {};
-
const auto relPersistence = static_cast<NPg::ERelPersistence>(value->relation->relpersistence);
switch (relPersistence) {
case NPg::ERelPersistence::Temp:
@@ -2766,7 +2767,7 @@ public:
return nullptr;
}
}
-
+
TMaybe<TPgConst> GetValueNType(const A_Const* value) {
TPgConst pgConst;
const auto& val = value->val;
@@ -2804,7 +2805,7 @@ public:
}
}
}
-
+
TAstNode* AutoParametrizeConst(TPgConst&& valueNType, TAstNode* pgType) {
Ydb::TypedValue typedValue;
@@ -2835,11 +2836,11 @@ public:
return nullptr;
}
- TAstNode* pgTypeNode = NodeTag(val) != T_Null
+ TAstNode* pgTypeNode = NodeTag(val) != T_Null
? L(A("PgType"), QA(TPgConst::ToString(valueNType->type)))
: L(A("PgType"), QA("unknown"));
- if (Settings.AutoParametrizeEnabled &&
+ if (Settings.AutoParametrizeEnabled &&
Settings.AutoParametrizeEnabledScopes.contains(settings.Scope)) {
return AutoParametrizeConst(std::move(valueNType.GetRef()), pgTypeNode);
}
@@ -3979,7 +3980,7 @@ public:
TAstNode* QVL(TAstNode* node, TPosition pos = {}) {
return QVL(&node, 1, pos);
}
-
+
TAstNode* QVL(TArrayRef<TAstNode*> nodes, TPosition pos = {}) {
return Q(VL(nodes, pos), pos);
}