aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-02-09 10:08:40 +0300
committerhcpp <hcpp@ydb.tech>2023-02-09 10:08:40 +0300
commitbbb79f5a581e9a3e2111f2823bcf16288e60e98c (patch)
tree8486e266d4542eb3bdb81d4a2728add9ef07bfec
parentaf24f1c243267a93328f66b5b3d67c5473e74f5b (diff)
downloadydb-bbb79f5a581e9a3e2111f2823bcf16288e60e98c.tar.gz
external tables has been supported for kqp gateway
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp98
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h23
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp58
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp26
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h8
5 files changed, 210 insertions, 3 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 82c455a92e..a46e2fbfa3 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1242,6 +1242,84 @@ public:
}
}
+ TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
+ const NYql::TCreateExternalTableSettings& settings,
+ bool createDir) override {
+ using TRequest = TEvTxUserProxy::TEvProposeTransaction;
+
+ try {
+ if (!CheckCluster(cluster)) {
+ return InvalidCluster<TGenericResult>(cluster);
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!GetPathPair(settings.ExternalTable, pathPair, error, createDir)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ auto ev = MakeHolder<TRequest>();
+ ev->Record.SetDatabaseName(Database);
+ if (UserToken) {
+ ev->Record.SetUserToken(UserToken->Serialized);
+ }
+ auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
+ schemeTx.SetWorkingDir(pathPair.first);
+ schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
+
+ NKikimrSchemeOp::TExternalTableDescription& tableDesc = *schemeTx.MutableCreateExternalTable();
+ FillCreateExternalTableColumnDesc(tableDesc, pathPair.second, settings);
+ return SendSchemeRequest(ev.Release());
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
+ TFuture<TGenericResult> AlterExternalTable(const TString& cluster,
+ const NYql::TAlterExternalTableSettings& settings) override {
+ Y_UNUSED(cluster, settings);
+ return MakeErrorFuture<TGenericResult>(std::make_exception_ptr(yexception() << "The alter is not supported for the external table"));
+ }
+
+ TFuture<TGenericResult> DropExternalTable(const TString& cluster,
+ const NYql::TDropExternalTableSettings& settings) override {
+ using TRequest = TEvTxUserProxy::TEvProposeTransaction;
+
+ try {
+ if (!CheckCluster(cluster)) {
+ return InvalidCluster<TGenericResult>(cluster);
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!GetPathPair(settings.ExternalTable, pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ auto ev = MakeHolder<TRequest>();
+ ev->Record.SetDatabaseName(Database);
+ if (UserToken) {
+ ev->Record.SetUserToken(UserToken->Serialized);
+ }
+
+ auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
+ schemeTx.SetWorkingDir(pathPair.first);
+ schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable);
+
+ NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
+ drop.SetName(pathPair.second);
+ return SendSchemeRequest(ev.Release());
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
@@ -2096,6 +2174,26 @@ private:
schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
}
+ static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalableDesc,
+ const TString& name,
+ const NYql::TCreateExternalTableSettings& settings)
+ {
+ externalableDesc.SetName(name);
+ externalableDesc.SetDataSourcePath(settings.DataSourcePath);
+ externalableDesc.SetLocation(settings.Location);
+
+ Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size());
+ for (const auto& name : settings.ColumnOrder) {
+ auto columnIt = settings.Columns.find(name);
+ Y_ENSURE(columnIt != settings.Columns.end());
+
+ TColumnDescription& columnDesc = *externalableDesc.AddColumns();
+ columnDesc.SetName(columnIt->second.Name);
+ columnDesc.SetType(columnIt->second.Type);
+ columnDesc.SetNotNull(columnIt->second.NotNull);
+ }
+ }
+
static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) {
if (!params) {
return;
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 35010a5a51..b8688ebe17 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -499,6 +499,23 @@ struct TDropTableStoreSettings {
TString TableStore;
};
+struct TCreateExternalTableSettings {
+ TString ExternalTable;
+ TString DataSourcePath;
+ TString Location;
+ TVector<TString> ColumnOrder;
+ TMap<TString, TKikimrColumnMetadata> Columns;
+ TVector<std::pair<TString, TString>> SourceTypeParameters;
+};
+
+struct TAlterExternalTableSettings {
+ TString ExternalTable;
+};
+
+struct TDropExternalTableSettings {
+ TString ExternalTable;
+};
+
struct TKikimrListPathItem {
TKikimrListPathItem(TString name, bool isDirectory) {
Name = name;
@@ -654,6 +671,12 @@ public:
virtual NThreading::TFuture<TGenericResult> DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0;
+ virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> AlterExternalTable(const TString& cluster, const TAlterExternalTableSettings& settings) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> DropExternalTable(const TString& cluster, const TDropExternalTableSettings& settings) = 0;
+
virtual TVector<TString> GetCollectedSchemeData() = 0;
public:
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 90f34067d4..b78d5fb48a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -1,5 +1,5 @@
-#include <ydb/core/client/minikql_result_lib/converter.h>
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
+#include <ydb/core/client/minikql_result_lib/converter.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
@@ -262,6 +262,45 @@ void TestDropTableCommon(TIntrusivePtr<IKikimrGateway> gateway) {
UNIT_ASSERT(!loadResponse.Metadata->DoesExist);
}
+void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) {
+ NYql::TCreateExternalTableSettings settings;
+
+ settings.ExternalTable = path;
+
+ settings.Columns.insert(std::make_pair("Column1", TKikimrColumnMetadata{"Column1", 0, "Uint32", false}));
+ settings.ColumnOrder.push_back("Column1");
+
+ settings.Columns.insert(std::make_pair("Column2", TKikimrColumnMetadata{"Column2", 0, "String", false}));
+ settings.ColumnOrder.push_back("Column2");
+
+ auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true);
+ responseFuture.Wait();
+ auto response = responseFuture.GetValue();
+ response.Issues().PrintTo(Cerr);
+
+ UNIT_ASSERT_C(response.Success(), response.Issues().ToString());
+
+ auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalTable = externalTableDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalTable);
+ UNIT_ASSERT(externalTable.ExternalTableInfo);
+ UNIT_ASSERT_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 2);
+}
+
+void TestDropExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) {
+
+ auto responseFuture = gateway->DropExternalTable(TestCluster, TDropExternalTableSettings{.ExternalTable=path});
+ responseFuture.Wait();
+ auto response = responseFuture.GetValue();
+ response.Issues().PrintTo(Cerr);
+ UNIT_ASSERT(response.Success());
+
+ auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalTable = externalTableDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1);
+ UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
+}
+
} // namespace
@@ -329,6 +368,23 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
CreateSampleTables(kikimr);
TestCreateTableCommon(GetIcGateway(kikimr.GetTestServer()), kikimr.GetTestClient(), true, Nothing(), true);
}
+
+ Y_UNIT_TEST(TestCreateExternalTable) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
+ }
+
+ Y_UNIT_TEST(TestCreateSameExternalTable) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
+ TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
+ }
+
+ Y_UNIT_TEST(TestDropExternalTable) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
+ TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
+ }
}
} // namespace NYql
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index ae3826794d..c5fbccf9f6 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -945,5 +945,31 @@ void InitRoot(Tests::TServer::TPtr server, TActorId sender) {
server->SetupRootStoragePools(sender);
}
+THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender,
+ const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op)
+{
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet;
+ using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult;
+
+ auto request = MakeHolder<TNavigate>();
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Path = SplitPath(path);
+ entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
+ entry.Operation = op;
+ entry.ShowPrivatePath = true;
+ runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release())));
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get());
+
+ auto* response = ev->Get()->Request.Release();
+ UNIT_ASSERT(response);
+ UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1);
+
+ return THolder(response);
+}
+
} // namspace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index d9a7ef2b94..c8c420833b 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -2,11 +2,12 @@
#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
-#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
-#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
#include <library/cpp/yson/node/node_io.h>
@@ -258,5 +259,8 @@ void WaitForKqpProxyInit(const NYdb::TDriver& driver);
void InitRoot(Tests::TServer::TPtr server, TActorId sender);
+THolder<NKikimr::NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender,
+ const TString& path, NKikimr::NSchemeCache::TSchemeCacheNavigate::EOp op);
+
} // namespace NKqp
} // namespace NKikimr