diff options
author | hcpp <hcpp@ydb.tech> | 2023-02-09 10:08:40 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-02-09 10:08:40 +0300 |
commit | bbb79f5a581e9a3e2111f2823bcf16288e60e98c (patch) | |
tree | 8486e266d4542eb3bdb81d4a2728add9ef07bfec | |
parent | af24f1c243267a93328f66b5b3d67c5473e74f5b (diff) | |
download | ydb-bbb79f5a581e9a3e2111f2823bcf16288e60e98c.tar.gz |
external tables has been supported for kqp gateway
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 98 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 23 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 58 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 8 |
5 files changed, 210 insertions, 3 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 82c455a92e..a46e2fbfa3 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1242,6 +1242,84 @@ public: } } + TFuture<TGenericResult> CreateExternalTable(const TString& cluster, + const NYql::TCreateExternalTableSettings& settings, + bool createDir) override { + using TRequest = TEvTxUserProxy::TEvProposeTransaction; + + try { + if (!CheckCluster(cluster)) { + return InvalidCluster<TGenericResult>(cluster); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!GetPathPair(settings.ExternalTable, pathPair, error, createDir)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + auto ev = MakeHolder<TRequest>(); + ev->Record.SetDatabaseName(Database); + if (UserToken) { + ev->Record.SetUserToken(UserToken->Serialized); + } + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable); + + NKikimrSchemeOp::TExternalTableDescription& tableDesc = *schemeTx.MutableCreateExternalTable(); + FillCreateExternalTableColumnDesc(tableDesc, pathPair.second, settings); + return SendSchemeRequest(ev.Release()); + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + + TFuture<TGenericResult> AlterExternalTable(const TString& cluster, + const NYql::TAlterExternalTableSettings& settings) override { + Y_UNUSED(cluster, settings); + return MakeErrorFuture<TGenericResult>(std::make_exception_ptr(yexception() << "The alter is not supported for the external table")); + } + + TFuture<TGenericResult> DropExternalTable(const TString& cluster, + const NYql::TDropExternalTableSettings& settings) override { + using TRequest = TEvTxUserProxy::TEvProposeTransaction; + + try { + if (!CheckCluster(cluster)) { + return InvalidCluster<TGenericResult>(cluster); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!GetPathPair(settings.ExternalTable, pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + auto ev = MakeHolder<TRequest>(); + ev->Record.SetDatabaseName(Database); + if (UserToken) { + ev->Record.SetUserToken(UserToken->Serialized); + } + + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable); + + NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop(); + drop.SetName(pathPair.second); + return SendSchemeRequest(ev.Release()); + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override { using TRequest = TEvTxUserProxy::TEvProposeTransaction; @@ -2096,6 +2174,26 @@ private: schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); } + static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalableDesc, + const TString& name, + const NYql::TCreateExternalTableSettings& settings) + { + externalableDesc.SetName(name); + externalableDesc.SetDataSourcePath(settings.DataSourcePath); + externalableDesc.SetLocation(settings.Location); + + Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size()); + for (const auto& name : settings.ColumnOrder) { + auto columnIt = settings.Columns.find(name); + Y_ENSURE(columnIt != settings.Columns.end()); + + TColumnDescription& columnDesc = *externalableDesc.AddColumns(); + columnDesc.SetName(columnIt->second.Name); + columnDesc.SetType(columnIt->second.Type); + columnDesc.SetNotNull(columnIt->second.NotNull); + } + } + static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) { if (!params) { return; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 35010a5a51..b8688ebe17 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -499,6 +499,23 @@ struct TDropTableStoreSettings { TString TableStore; }; +struct TCreateExternalTableSettings { + TString ExternalTable; + TString DataSourcePath; + TString Location; + TVector<TString> ColumnOrder; + TMap<TString, TKikimrColumnMetadata> Columns; + TVector<std::pair<TString, TString>> SourceTypeParameters; +}; + +struct TAlterExternalTableSettings { + TString ExternalTable; +}; + +struct TDropExternalTableSettings { + TString ExternalTable; +}; + struct TKikimrListPathItem { TKikimrListPathItem(TString name, bool isDirectory) { Name = name; @@ -654,6 +671,12 @@ public: virtual NThreading::TFuture<TGenericResult> DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir) = 0; + + virtual NThreading::TFuture<TGenericResult> AlterExternalTable(const TString& cluster, const TAlterExternalTableSettings& settings) = 0; + + virtual NThreading::TFuture<TGenericResult> DropExternalTable(const TString& cluster, const TDropExternalTableSettings& settings) = 0; + virtual TVector<TString> GetCollectedSchemeData() = 0; public: diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 90f34067d4..b78d5fb48a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -1,5 +1,5 @@ -#include <ydb/core/client/minikql_result_lib/converter.h> #include <ydb/core/client/minikql_compile/mkql_compile_service.h> +#include <ydb/core/client/minikql_result_lib/converter.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/gateway/kqp_metadata_loader.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> @@ -262,6 +262,45 @@ void TestDropTableCommon(TIntrusivePtr<IKikimrGateway> gateway) { UNIT_ASSERT(!loadResponse.Metadata->DoesExist); } +void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) { + NYql::TCreateExternalTableSettings settings; + + settings.ExternalTable = path; + + settings.Columns.insert(std::make_pair("Column1", TKikimrColumnMetadata{"Column1", 0, "Uint32", false})); + settings.ColumnOrder.push_back("Column1"); + + settings.Columns.insert(std::make_pair("Column2", TKikimrColumnMetadata{"Column2", 0, "String", false})); + settings.ColumnOrder.push_back("Column2"); + + auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true); + responseFuture.Wait(); + auto response = responseFuture.GetValue(); + response.Issues().PrintTo(Cerr); + + UNIT_ASSERT_C(response.Success(), response.Issues().ToString()); + + auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalTable = externalTableDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalTable); + UNIT_ASSERT(externalTable.ExternalTableInfo); + UNIT_ASSERT_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 2); +} + +void TestDropExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) { + + auto responseFuture = gateway->DropExternalTable(TestCluster, TDropExternalTableSettings{.ExternalTable=path}); + responseFuture.Wait(); + auto response = responseFuture.GetValue(); + response.Issues().PrintTo(Cerr); + UNIT_ASSERT(response.Success()); + + auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalTable = externalTableDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1); + UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); +} + } // namespace @@ -329,6 +368,23 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { CreateSampleTables(kikimr); TestCreateTableCommon(GetIcGateway(kikimr.GetTestServer()), kikimr.GetTestClient(), true, Nothing(), true); } + + Y_UNIT_TEST(TestCreateExternalTable) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); + } + + Y_UNIT_TEST(TestCreateSameExternalTable) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); + TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); + } + + Y_UNIT_TEST(TestDropExternalTable) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); + TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); + } } } // namespace NYql diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index ae3826794d..c5fbccf9f6 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -945,5 +945,31 @@ void InitRoot(Tests::TServer::TPtr server, TActorId sender) { server->SetupRootStoragePools(sender); } +THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, + const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op) +{ + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet; + using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult; + + auto request = MakeHolder<TNavigate>(); + auto& entry = request->ResultSet.emplace_back(); + entry.Path = SplitPath(path); + entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; + entry.Operation = op; + entry.ShowPrivatePath = true; + runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release()))); + + auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()); + + auto* response = ev->Get()->Request.Release(); + UNIT_ASSERT(response); + UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1); + + return THolder(response); +} + } // namspace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index d9a7ef2b94..c8c420833b 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -2,11 +2,12 @@ #include <ydb/core/testlib/test_client.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/lib/yson_value/ydb_yson_value.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> -#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> #include <library/cpp/yson/node/node_io.h> @@ -258,5 +259,8 @@ void WaitForKqpProxyInit(const NYdb::TDriver& driver); void InitRoot(Tests::TServer::TPtr server, TActorId sender); +THolder<NKikimr::NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, + const TString& path, NKikimr::NSchemeCache::TSchemeCacheNavigate::EOp op); + } // namespace NKqp } // namespace NKikimr |