aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-10-11 12:43:59 +0300
committerqrort <qrort@yandex-team.com>2023-10-11 13:25:54 +0300
commitc5f748cdb834e1f931d33ddca00cc8cd308985ec (patch)
treece6b4c51fd25b3a56a19eb912a2006e2798f33de
parent62e430b2b3432e4cc638478a9adc56bf739ed1b7 (diff)
downloadydb-c5f748cdb834e1f931d33ddca00cc8cd308985ec.tar.gz
KIKIMR-19330: Drop table if exists in query service
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp13
-rw-r--r--ydb/core/kqp/gateway/actors/scheme.h27
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp8
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp47
-rw-r--r--ydb/core/protos/flat_scheme_op.proto2
9 files changed, 96 insertions, 16 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index f1a86f3fb84..6ea099cdfac 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -118,7 +118,16 @@ public:
}
auto promise = NewPromise<IKqpGateway::TGenericResult>();
- IActor* requestHandler = new TSchemeOpRequestHandler(ev.Release(), promise, true);
+
+ bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().HasSuccessOnNotExist()
+ ? ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist()
+ : false;
+ IActor* requestHandler = new TSchemeOpRequestHandler(
+ ev.Release(),
+ promise,
+ true,
+ successOnNotExist
+ );
RegisterWithSameMailbox(requestHandler);
auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem;
@@ -126,7 +135,7 @@ public:
promise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
auto ev = MakeHolder<TEvPrivate::TEvResult>();
ev->Result = future.GetValue();
-
+
actorSystem->Send(selfId, ev.Release());
});
diff --git a/ydb/core/kqp/gateway/actors/scheme.h b/ydb/core/kqp/gateway/actors/scheme.h
index 40fbaea5a1d..1ea2d8ccddc 100644
--- a/ydb/core/kqp/gateway/actors/scheme.h
+++ b/ydb/core/kqp/gateway/actors/scheme.h
@@ -27,6 +27,12 @@ public:
, FailedOnAlreadyExists(failedOnAlreadyExists)
{}
+ TSchemeOpRequestHandler(TRequest* request, NThreading::TPromise<TResult> promise, bool failedOnAlreadyExists, bool successOnNotExist)
+ : TBase(request, promise, {})
+ , FailedOnAlreadyExists(failedOnAlreadyExists)
+ , SuccessOnNotExist(successOnNotExist)
+ {}
+
void Bootstrap(const TActorContext& ctx) {
TActorId txproxy = MakeTxProxyID();
@@ -79,8 +85,8 @@ public:
(!FailedOnAlreadyExists && response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists))
{
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request"
- << ", TxId: " << response.GetTxId());
-
+ << ", TxId: " << response.GetTxId());
+
TResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));
@@ -98,9 +104,19 @@ public:
}
case TEvTxUserProxy::TResultStatus::ResolveError: {
- Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_SCHEME_ERROR,
- response.GetSchemeShardReason(), {}));
- this->Die(ctx);
+ if (response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPathDoesNotExist
+ && SuccessOnNotExist) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request: path does not exist,"
+ << "SuccessOnNotExist: true, TxId: " << response.GetTxId());
+ TResult result;
+ result.SetSuccess();
+ Promise.SetValue(std::move(result));
+ this->Die(ctx);
+ } else {
+ Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_SCHEME_ERROR,
+ response.GetSchemeShardReason(), {}));
+ this->Die(ctx);
+ }
return;
}
@@ -177,6 +193,7 @@ public:
private:
TActorId ShemePipeActorId;
bool FailedOnAlreadyExists = false;
+ bool SuccessOnNotExist = false;
};
} // namespace NKqp
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 478d9bb239a..c2e25e560df 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -849,14 +849,14 @@ public:
}
}
- TFuture<TGenericResult> DropTable(const TString& cluster, const TString& table) override {
+ TFuture<TGenericResult> DropTable(const TString& cluster, const NYql::TDropTableSettings& settings) override {
try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}
Ydb::Table::DropTableRequest dropTable;
- dropTable.set_path(table);
+ dropTable.set_path(settings.Table);
// FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency
using namespace NGRpcService;
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index d3e36b15194..8adbad2a830 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -571,10 +571,10 @@ public:
FORWARD_ENSURE_NO_PREPARE(RenameTable, src, dst, cluster);
}
- TFuture<TGenericResult> DropTable(const TString& cluster, const TString& table) override {
+ TFuture<TGenericResult> DropTable(const TString& cluster, const TDropTableSettings& settings) override {
CHECK_PREPARED_DDL(DropTable);
- auto metadata = SessionCtx->Tables().GetTable(cluster, table).Metadata;
+ auto metadata = SessionCtx->Tables().GetTable(cluster, settings.Table).Metadata;
std::pair<TString, TString> pathPair;
TString error;
@@ -597,7 +597,7 @@ public:
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx);
-
+ phyTx.MutableSchemeOperation()->MutableDropTable()->SetSuccessOnNotExist(settings.SuccessOnNotExist);
TGenericResult result;
result.SetSuccess();
dropPromise.SetValue(result);
@@ -610,7 +610,7 @@ public:
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
dropPromise.SetValue(errResult);
}
- return Gateway->DropTable(cluster, table);
+ return Gateway->DropTable(cluster, settings);
}
return dropPromise.GetFuture();
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 00e731f862f..81c21614d4a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1007,7 +1007,7 @@ public:
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
switch (tableTypeItem) {
case ETableType::Table:
- future = Gateway->DropTable(table.Metadata->Cluster, table.Metadata->Name);
+ future = Gateway->DropTable(table.Metadata->Cluster, TDropTableSettings{.Table = table.Metadata->Name, .SuccessOnNotExist = missingOk});
if (missingOk) {
future = future.Apply([](const NThreading::TFuture<IKikimrGateway::TGenericResult>& res) {
auto operationResult = res.GetValue();
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 8af5e497ca9..cb1dc64be33 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -604,6 +604,11 @@ struct TAlterTableStoreSettings {
TString TableStore;
};
+struct TDropTableSettings {
+ TString Table;
+ bool SuccessOnNotExist;
+};
+
struct TDropTableStoreSettings {
TString TableStore;
};
@@ -761,7 +766,7 @@ public:
virtual NThreading::TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) = 0;
- virtual NThreading::TFuture<TGenericResult> DropTable(const TString& cluster, const TString& table) = 0;
+ virtual NThreading::TFuture<TGenericResult> DropTable(const TString& cluster, const TDropTableSettings& settings) = 0;
virtual NThreading::TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) = 0;
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 39d942af588..87b60e866d1 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -120,7 +120,7 @@ void TestLoadTableMetadataCommon(TIntrusivePtr<IKikimrGateway> gateway) {
}
void TestDropTableCommon(TIntrusivePtr<IKikimrGateway> gateway) {
- auto responseFuture = gateway->DropTable(TestCluster, "/Root/Test/UserTable");
+ auto responseFuture = gateway->DropTable(TestCluster, TDropTableSettings{.Table = "/Root/Test/UserTable"});
responseFuture.Wait();
auto response = responseFuture.GetValue();
response.Issues().PrintTo(Cerr);
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 6dfe85181d1..e7200202f8c 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -2273,6 +2273,53 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}
+ Y_UNIT_TEST(DropTableIfExists_GenericQuery) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting})
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto result = db.ExecuteQuery(R"(
+ DROP TABLE IF EXISTS test;
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE test (
+ id int4 primary key
+ );
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ DROP TABLE IF EXISTS test;
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot find table 'db.[/Root/test]'"));
+ }
+ }
+
Y_UNIT_TEST(JoinWithQueryService) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto client = kikimr.GetTableClient();
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 42d93ab4c1a..f6ece2ed23c 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1491,6 +1491,8 @@ message TModifyScheme {
optional TDropBlockStoreVolume DropBlockStoreVolume = 60;
optional NKikimrIndexBuilder.TColumnBuildSettings InitiateColumnBuild = 61;
+
+ optional bool SuccessOnNotExist = 62;
}
// "Script", used by client to parse text files with multiple DDL commands