aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-08-30 22:26:53 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-08-30 23:00:53 +0300
commitab38bfd73cf3e132ca92c4cd185599e8b018683c (patch)
tree45619117f72df8688ad220c0ccd55634944128b9
parent39994a5188cbe7a4d0ff1a6513f1556f75540fe8 (diff)
downloadydb-ab38bfd73cf3e132ca92c4cd185599e8b018683c.tar.gz
KIKIMR-19155: Add drop table to prepare ddl
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp26
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp43
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp61
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp78
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp32
-rw-r--r--ydb/core/protos/kqp_physical.proto1
7 files changed, 198 insertions, 47 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 3cec80e4ce..39a4106800 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -55,17 +55,27 @@ public:
}
const auto& schemeOp = PhyTx->GetSchemeOperation();
- auto modifyScheme = schemeOp.GetCreateTable();
-
- if (Temporary) {
- auto* createTable = modifyScheme.MutableCreateTable();
- createTable->SetName(createTable->GetName() + SessionId);
- createTable->SetPath(createTable->GetPath() + SessionId);
- }
switch (schemeOp.GetOperationCase()) {
- case NKqpProto::TKqpSchemeOperation::kCreateTable:
+ case NKqpProto::TKqpSchemeOperation::kCreateTable: {
+ auto modifyScheme = schemeOp.GetCreateTable();
+ if (Temporary) {
+ auto* createTable = modifyScheme.MutableCreateTable();
+ createTable->SetName(createTable->GetName() + SessionId);
+ createTable->SetPath(createTable->GetPath() + SessionId);
+ }
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
+ }
+
+ case NKqpProto::TKqpSchemeOperation::kDropTable: {
+ auto modifyScheme = schemeOp.GetDropTable();
+ if (Temporary) {
+ auto* dropTable = modifyScheme.MutableDrop();
+ dropTable->SetName(dropTable->GetName() + SessionId);
+ }
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 69f74faccc..6cf615ab50 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -553,7 +553,48 @@ public:
}
TFuture<TGenericResult> DropTable(const TString& cluster, const TString& table) override {
- FORWARD_ENSURE_NO_PREPARE(DropTable, cluster, table);
+ CHECK_PREPARED_DDL(DropTable);
+
+ auto metadata = SessionCtx->Tables().GetTable(cluster, table).Metadata;
+
+ std::pair<TString, TString> pathPair;
+ TString error;
+ if (!SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+
+ auto temporary = metadata->Temporary;
+ auto dropPromise = NewPromise<TGenericResult>();
+
+ NKikimrSchemeOp::TModifyScheme schemeTx;
+ schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable);
+ schemeTx.SetWorkingDir(pathPair.first);
+
+ auto* drop = schemeTx.MutableDrop();
+ drop->SetName(pathPair.second);
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx);
+
+ TGenericResult result;
+ result.SetSuccess();
+ dropPromise.SetValue(result);
+ } else {
+ if (temporary) {
+ auto code = Ydb::StatusIds::BAD_REQUEST;
+ auto error = TStringBuilder() << "Not allowed to drop temp table";
+ IKqpGateway::TGenericResult errResult;
+ errResult.AddIssue(NYql::TIssue(error));
+ errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+ dropPromise.SetValue(errResult);
+ }
+ return Gateway->DropTable(cluster, table);
+ }
+
+ return dropPromise.GetFuture();
}
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index bb70156364..a1c5c46d3b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -964,10 +964,6 @@ public:
}
if (auto maybeDrop = TMaybeNode<TKiDropTable>(input)) {
- if (!EnsureNotPrepare("DROP TABLE", input->Pos(), SessionCtx->Query(), ctx)) {
- return SyncError();
- }
-
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
return SyncStatus(requireStatus);
@@ -997,40 +993,35 @@ public:
return SyncError();
}
- bool prepareOnly = SessionCtx->Query().PrepareOnly;
bool missingOk = (maybeDrop.MissingOk().Cast().Value() == "1");
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
- if (prepareOnly) {
- future = CreateDummySuccess();
- } else {
- switch (tableTypeItem) {
- case ETableType::Table:
- future = Gateway->DropTable(table.Metadata->Cluster, table.Metadata->Name);
- if (missingOk) {
- future = future.Apply([](const NThreading::TFuture<IKikimrGateway::TGenericResult>& res) {
- auto operationResult = res.GetValue();
- bool pathNotExist = false;
- for (const auto& issue : operationResult.Issues()) {
- WalkThroughIssues(issue, false, [&pathNotExist](const NYql::TIssue& issue, int level) {
- Y_UNUSED(level);
- pathNotExist |= (issue.GetCode() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST);
- });
- }
- return pathNotExist ? CreateDummySuccess() : res;
- });
- }
- break;
- case ETableType::TableStore:
- future = Gateway->DropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast()));
- break;
- case ETableType::ExternalTable:
- future = Gateway->DropExternalTable(cluster, ParseDropExternalTableSettings(maybeDrop.Cast()));
- break;
- case ETableType::Unknown:
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Unsupported table type " << tableTypeString));
- return SyncError();
- }
+ switch (tableTypeItem) {
+ case ETableType::Table:
+ future = Gateway->DropTable(table.Metadata->Cluster, table.Metadata->Name);
+ if (missingOk) {
+ future = future.Apply([](const NThreading::TFuture<IKikimrGateway::TGenericResult>& res) {
+ auto operationResult = res.GetValue();
+ bool pathNotExist = false;
+ for (const auto& issue : operationResult.Issues()) {
+ WalkThroughIssues(issue, false, [&pathNotExist](const NYql::TIssue& issue, int level) {
+ Y_UNUSED(level);
+ pathNotExist |= (issue.GetCode() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST);
+ });
+ }
+ return pathNotExist ? CreateDummySuccess() : res;
+ });
+ }
+ break;
+ case ETableType::TableStore:
+ future = Gateway->DropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast()));
+ break;
+ case ETableType::ExternalTable:
+ future = Gateway->DropExternalTable(cluster, ParseDropExternalTableSettings(maybeDrop.Cast()));
+ break;
+ case ETableType::Unknown:
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Unsupported table type " << tableTypeString));
+ return SyncError();
}
return WrapFuture(future,
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index bf1d4ad267..c2e4131a3f 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1460,7 +1460,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
Y_UNIT_TEST(CreateTempTable) {
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
@@ -1542,6 +1542,11 @@ Y_UNIT_TEST_SUITE(KqpPg) {
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
+ bool allDoneOk = true;
+ NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);
+
+ UNIT_ASSERT(allDoneOk);
+
auto sessionAnother = client.GetSession().GetValueSync().GetSession();
auto idAnother = sessionAnother.GetId();
UNIT_ASSERT(id != idAnother);
@@ -1554,11 +1559,82 @@ Y_UNIT_TEST_SUITE(KqpPg) {
auto resultSelectAnother = sessionAnother.ExecuteQuery(
querySelectAnother, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!resultSelectAnother.IsSuccess());
+ }
+
+ Y_UNIT_TEST(TempTablesDrop) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr(
+ serverSettings.SetWithSampleTables(false).SetEnableTempTables(true));
+ auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
+ auto client = kikimr.GetQueryClient();
+
+ auto session = client.GetSession().GetValueSync().GetSession();
+ auto id = session.GetId();
+
+ const auto queryCreate = Q_(R"(
+ --!syntax_pg
+ CREATE TEMP TABLE PgTemp (
+ key int2 PRIMARY KEY,
+ value int2))");
+
+ auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());
+
+ {
+ const auto querySelect = Q_(R"(
+ --!syntax_pg
+ SELECT * FROM PgTemp;
+ )");
+
+ auto resultSelect = session.ExecuteQuery(
+ querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
+ }
+
+ const auto queryDrop = Q_(R"(
+ --!syntax_pg
+ DROP TABLE PgTemp;
+ )");
+
+ auto resultDrop = session.ExecuteQuery(
+ queryDrop, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultDrop.IsSuccess(), resultDrop.GetIssues().ToString());
+
+ {
+ const auto querySelect = Q_(R"(
+ --!syntax_pg
+ SELECT * FROM PgTemp;
+ )");
+
+ auto resultSelect = session.ExecuteQuery(
+ querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT(!resultSelect.IsSuccess());
+ }
bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);
+
+ auto sessionAnother = client.GetSession().GetValueSync().GetSession();
+ auto idAnother = sessionAnother.GetId();
+ UNIT_ASSERT(id != idAnother);
+
+ {
+ const auto querySelect = Q_(R"(
+ --!syntax_pg
+ SELECT * FROM PgTemp;
+ )");
+
+ auto resultSelect = sessionAnother.ExecuteQuery(
+ querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT(!resultSelect.IsSuccess());
+ }
}
Y_UNIT_TEST(ValuesInsert) {
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 18f0baf3e8..386d4f7dcb 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -1169,8 +1169,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
- UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue) {
- return issue.GetMessage().Contains("not supported");
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_OPERATION, [](const NYql::TIssue& issue) {
+ return issue.GetMessage().Contains("can't be performed in data query");
}));
result = session.ExecuteDataQuery(Q_(R"(
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 8f73db8520..7efc758660 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -1177,6 +1177,38 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
settings.FetchToken(results.GetNextFetchToken());
}
}
+
+ Y_UNIT_TEST(QueryDropDdl) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE TestDropDdl (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )", TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetResultSets().empty());
+
+ result = db.ExecuteQuery(R"(
+ DROP TABLE TestDropDdl;
+ )", TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM TestDropDdl;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT(!result.IsSuccess());
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 3f44bc2481..3521bd1401 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -366,6 +366,7 @@ message TKqpPhyResult {
message TKqpSchemeOperation {
oneof Operation {
NKikimrSchemeOp.TModifyScheme CreateTable = 1;
+ NKikimrSchemeOp.TModifyScheme DropTable = 2;
}
}