diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-08-30 22:26:53 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-08-30 23:00:53 +0300 |
commit | ab38bfd73cf3e132ca92c4cd185599e8b018683c (patch) | |
tree | 45619117f72df8688ad220c0ccd55634944128b9 | |
parent | 39994a5188cbe7a4d0ff1a6513f1556f75540fe8 (diff) | |
download | ydb-ab38bfd73cf3e132ca92c4cd185599e8b018683c.tar.gz |
KIKIMR-19155: Add drop table to prepare ddl
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 43 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 61 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 78 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 32 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 |
7 files changed, 198 insertions, 47 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 3cec80e4ce..39a4106800 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -55,17 +55,27 @@ public: } const auto& schemeOp = PhyTx->GetSchemeOperation(); - auto modifyScheme = schemeOp.GetCreateTable(); - - if (Temporary) { - auto* createTable = modifyScheme.MutableCreateTable(); - createTable->SetName(createTable->GetName() + SessionId); - createTable->SetPath(createTable->GetPath() + SessionId); - } switch (schemeOp.GetOperationCase()) { - case NKqpProto::TKqpSchemeOperation::kCreateTable: + case NKqpProto::TKqpSchemeOperation::kCreateTable: { + auto modifyScheme = schemeOp.GetCreateTable(); + if (Temporary) { + auto* createTable = modifyScheme.MutableCreateTable(); + createTable->SetName(createTable->GetName() + SessionId); + createTable->SetPath(createTable->GetPath() + SessionId); + } ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; + } + + case NKqpProto::TKqpSchemeOperation::kDropTable: { + auto modifyScheme = schemeOp.GetDropTable(); + if (Temporary) { + auto* dropTable = modifyScheme.MutableDrop(); + dropTable->SetName(dropTable->GetName() + SessionId); + } + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } default: InternalError(TStringBuilder() << "Unexpected scheme operation: " diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 69f74faccc..6cf615ab50 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -553,7 +553,48 @@ public: } TFuture<TGenericResult> DropTable(const TString& cluster, const TString& table) override { - FORWARD_ENSURE_NO_PREPARE(DropTable, cluster, table); + CHECK_PREPARED_DDL(DropTable); + + auto metadata = SessionCtx->Tables().GetTable(cluster, table).Metadata; + + std::pair<TString, TString> pathPair; + TString error; + if (!SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + + auto temporary = metadata->Temporary; + auto dropPromise = NewPromise<TGenericResult>(); + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); + schemeTx.SetWorkingDir(pathPair.first); + + auto* drop = schemeTx.MutableDrop(); + drop->SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + dropPromise.SetValue(result); + } else { + if (temporary) { + auto code = Ydb::StatusIds::BAD_REQUEST; + auto error = TStringBuilder() << "Not allowed to drop temp table"; + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + dropPromise.SetValue(errResult); + } + return Gateway->DropTable(cluster, table); + } + + return dropPromise.GetFuture(); } TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index bb70156364..a1c5c46d3b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -964,10 +964,6 @@ public: } if (auto maybeDrop = TMaybeNode<TKiDropTable>(input)) { - if (!EnsureNotPrepare("DROP TABLE", input->Pos(), SessionCtx->Query(), ctx)) { - return SyncError(); - } - auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { return SyncStatus(requireStatus); @@ -997,40 +993,35 @@ public: return SyncError(); } - bool prepareOnly = SessionCtx->Query().PrepareOnly; bool missingOk = (maybeDrop.MissingOk().Cast().Value() == "1"); NThreading::TFuture<IKikimrGateway::TGenericResult> future; - if (prepareOnly) { - future = CreateDummySuccess(); - } else { - switch (tableTypeItem) { - case ETableType::Table: - future = Gateway->DropTable(table.Metadata->Cluster, table.Metadata->Name); - if (missingOk) { - future = future.Apply([](const NThreading::TFuture<IKikimrGateway::TGenericResult>& res) { - auto operationResult = res.GetValue(); - bool pathNotExist = false; - for (const auto& issue : operationResult.Issues()) { - WalkThroughIssues(issue, false, [&pathNotExist](const NYql::TIssue& issue, int level) { - Y_UNUSED(level); - pathNotExist |= (issue.GetCode() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST); - }); - } - return pathNotExist ? CreateDummySuccess() : res; - }); - } - break; - case ETableType::TableStore: - future = Gateway->DropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast())); - break; - case ETableType::ExternalTable: - future = Gateway->DropExternalTable(cluster, ParseDropExternalTableSettings(maybeDrop.Cast())); - break; - case ETableType::Unknown: - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Unsupported table type " << tableTypeString)); - return SyncError(); - } + switch (tableTypeItem) { + case ETableType::Table: + future = Gateway->DropTable(table.Metadata->Cluster, table.Metadata->Name); + if (missingOk) { + future = future.Apply([](const NThreading::TFuture<IKikimrGateway::TGenericResult>& res) { + auto operationResult = res.GetValue(); + bool pathNotExist = false; + for (const auto& issue : operationResult.Issues()) { + WalkThroughIssues(issue, false, [&pathNotExist](const NYql::TIssue& issue, int level) { + Y_UNUSED(level); + pathNotExist |= (issue.GetCode() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST); + }); + } + return pathNotExist ? CreateDummySuccess() : res; + }); + } + break; + case ETableType::TableStore: + future = Gateway->DropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast())); + break; + case ETableType::ExternalTable: + future = Gateway->DropExternalTable(cluster, ParseDropExternalTableSettings(maybeDrop.Cast())); + break; + case ETableType::Unknown: + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Unsupported table type " << tableTypeString)); + return SyncError(); } return WrapFuture(future, diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index bf1d4ad267..c2e4131a3f 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1460,7 +1460,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { Y_UNIT_TEST(CreateTempTable) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);; auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetAppConfig(appConfig) @@ -1542,6 +1542,11 @@ Y_UNIT_TEST_SUITE(KqpPg) { querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString()); + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); + + UNIT_ASSERT(allDoneOk); + auto sessionAnother = client.GetSession().GetValueSync().GetSession(); auto idAnother = sessionAnother.GetId(); UNIT_ASSERT(id != idAnother); @@ -1554,11 +1559,82 @@ Y_UNIT_TEST_SUITE(KqpPg) { auto resultSelectAnother = sessionAnother.ExecuteQuery( querySelectAnother, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT(!resultSelectAnother.IsSuccess()); + } + + Y_UNIT_TEST(TempTablesDrop) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr( + serverSettings.SetWithSampleTables(false).SetEnableTempTables(true)); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + auto client = kikimr.GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto id = session.GetId(); + + const auto queryCreate = Q_(R"( + --!syntax_pg + CREATE TEMP TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString()); + + { + const auto querySelect = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto resultSelect = session.ExecuteQuery( + querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString()); + } + + const auto queryDrop = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto resultDrop = session.ExecuteQuery( + queryDrop, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultDrop.IsSuccess(), resultDrop.GetIssues().ToString()); + + { + const auto querySelect = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto resultSelect = session.ExecuteQuery( + querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!resultSelect.IsSuccess()); + } bool allDoneOk = true; NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); UNIT_ASSERT(allDoneOk); + + auto sessionAnother = client.GetSession().GetValueSync().GetSession(); + auto idAnother = sessionAnother.GetId(); + UNIT_ASSERT(id != idAnother); + + { + const auto querySelect = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto resultSelect = sessionAnother.ExecuteQuery( + querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!resultSelect.IsSuccess()); + } } Y_UNIT_TEST(ValuesInsert) { diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 18f0baf3e8..386d4f7dcb 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -1169,8 +1169,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) { )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue) { - return issue.GetMessage().Contains("not supported"); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_OPERATION, [](const NYql::TIssue& issue) { + return issue.GetMessage().Contains("can't be performed in data query"); })); result = session.ExecuteDataQuery(Q_(R"( diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 8f73db8520..7efc758660 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -1177,6 +1177,38 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { settings.FetchToken(results.GetNextFetchToken()); } } + + Y_UNIT_TEST(QueryDropDdl) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDropDdl ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetResultSets().empty()); + + result = db.ExecuteQuery(R"( + DROP TABLE TestDropDdl; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDropDdl; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 3f44bc2481..3521bd1401 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -366,6 +366,7 @@ message TKqpPhyResult { message TKqpSchemeOperation { oneof Operation { NKikimrSchemeOp.TModifyScheme CreateTable = 1; + NKikimrSchemeOp.TModifyScheme DropTable = 2; } } |