summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp16
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp85
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp8
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp141
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/library/yql/sql/v1/SQLv1.g.in12
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format.cpp14
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp12
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp112
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp134
10 files changed, 517 insertions, 18 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 7bd1a951b62..53afb8e8a7f 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -547,14 +547,18 @@ class TSchemeOpRequestHandler: public TRequestHandlerBase<
TEvTxUserProxy::TEvProposeTransactionStatus,
IKqpGateway::TGenericResult>
{
+ bool FailedOnAlreadyExists = false;
public:
using TBase = typename TSchemeOpRequestHandler::TBase;
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus;
using TResult = IKqpGateway::TGenericResult;
- TSchemeOpRequestHandler(TRequest* request, TPromise<TResult> promise)
- : TBase(request, promise, {}) {}
+ TSchemeOpRequestHandler(TRequest* request, TPromise<TResult> promise, bool failedOnAlreadyExists)
+ : TBase(request, promise, {})
+ , FailedOnAlreadyExists(failedOnAlreadyExists)
+ {}
+
void Bootstrap(const TActorContext& ctx) {
TActorId txproxy = MakeTxProxyID();
@@ -604,7 +608,7 @@ public:
case TEvTxUserProxy::TResultStatus::ExecComplete: {
if (response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess ||
- response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists)
+ (!FailedOnAlreadyExists && response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists))
{
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request"
<< ", TxId: " << response.GetTxId());
@@ -1392,7 +1396,7 @@ public:
NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *schemeTx.MutableCreateExternalDataSource();
FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings);
- return SendSchemeRequest(ev.Release());
+ return SendSchemeRequest(ev.Release(), true);
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
@@ -2128,10 +2132,10 @@ private:
return promise.GetFuture();
}
- TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request)
+ TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, bool failedOnAlreadyExists = false)
{
auto promise = NewPromise<TGenericResult>();
- IActor* requestHandler = new TSchemeOpRequestHandler(request, promise);
+ IActor* requestHandler = new TSchemeOpRequestHandler(request, promise, failedOnAlreadyExists);
RegisterActor(requestHandler);
return promise.GetFuture();
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 60e158cbcb1..47ebe3c8ba8 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -128,6 +128,33 @@ namespace {
return dropGroupSettings;
}
+ TString GetOrDefault(const std::map<TString, TString>& container, const TString& key, const TString& defaultValue = TString{}) {
+ auto it = container.find(key);
+ return it == container.end() ? defaultValue : it->second;
+ }
+
+ TCreateExternalDataSourceSettings ParseCreateExternalDataSourceSettings(const TCreateObjectSettings& settings) {
+ TCreateExternalDataSourceSettings out;
+ out.ExternalDataSource = settings.GetObjectId();
+ out.SourceType = GetOrDefault(settings.GetFeatures(), "source_type");
+ out.AuthMethod = GetOrDefault(settings.GetFeatures(), "auth_method");
+ out.Installation = GetOrDefault(settings.GetFeatures(), "installation");
+ out.Location = GetOrDefault(settings.GetFeatures(), "location");
+ return out;
+ }
+
+ TAlterExternalDataSourceSettings ParseAlterExternalDataSourceSettings(const TAlterObjectSettings& settings) {
+ TAlterExternalDataSourceSettings out;
+ out.ExternalDataSource = settings.GetObjectId();
+ return out;
+ }
+
+ TDropExternalDataSourceSettings ParseDropExternalDataSourceSettings(const TDropObjectSettings& settings) {
+ TDropExternalDataSourceSettings out;
+ out.ExternalDataSource = settings.GetObjectId();
+ return out;
+ }
+
TCreateTableStoreSettings ParseCreateTableStoreSettings(TKiCreateTable create, const TTableSettings& settings) {
TCreateTableStoreSettings out;
out.TableStore = TString(create.Table());
@@ -395,9 +422,9 @@ template <class TKiObject, class TSettings>
class TObjectModifierTransformer {
private:
TIntrusivePtr<IKikimrGateway> Gateway;
- TIntrusivePtr<TKikimrSessionContext> SessionCtx;
TString ActionInfo;
protected:
+ TIntrusivePtr<TKikimrSessionContext> SessionCtx;
virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TSettings& settings) = 0;
TIntrusivePtr<IKikimrGateway> GetGateway() const {
return Gateway;
@@ -405,8 +432,8 @@ protected:
public:
TObjectModifierTransformer(const TString& actionInfo, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx)
: Gateway(gateway)
- , SessionCtx(sessionCtx)
, ActionInfo(actionInfo)
+ , SessionCtx(sessionCtx)
{
}
@@ -471,6 +498,48 @@ public:
using TBase::TBase;
};
+class TCreateExternalDataSourceTransformer: public TObjectModifierTransformer<TKiCreateObject, TCreateObjectSettings> {
+private:
+ using TBase = TObjectModifierTransformer<TKiCreateObject, TCreateObjectSettings>;
+protected:
+ virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TCreateObjectSettings& settings) override {
+ if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
+ return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it"));
+ }
+ return GetGateway()->CreateExternalDataSource(cluster, ParseCreateExternalDataSourceSettings(settings), true);
+ }
+public:
+ using TBase::TBase;
+};
+
+class TAlterExternalDataSourceTransformer: public TObjectModifierTransformer<TKiAlterObject, TAlterObjectSettings> {
+private:
+ using TBase = TObjectModifierTransformer<TKiAlterObject, TAlterObjectSettings>;
+protected:
+ virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TAlterObjectSettings& settings) override {
+ if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
+ return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it"));
+ }
+ return GetGateway()->AlterExternalDataSource(cluster, ParseAlterExternalDataSourceSettings(settings));
+ }
+public:
+ using TBase::TBase;
+};
+
+class TDropExternalDataSourceTransformer: public TObjectModifierTransformer<TKiDropObject, TDropObjectSettings> {
+private:
+ using TBase = TObjectModifierTransformer<TKiDropObject, TDropObjectSettings>;
+protected:
+ virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TDropObjectSettings& settings) override {
+ if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
+ return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it"));
+ }
+ return GetGateway()->DropExternalDataSource(cluster, ParseDropExternalDataSourceSettings(settings));
+ }
+public:
+ using TBase::TBase;
+};
+
class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSinkCallableExecutionTransformer> {
public:
TKiSinkCallableExecutionTransformer(
@@ -1149,15 +1218,21 @@ public:
}
if (auto kiObject = TMaybeNode<TKiCreateObject>(input)) {
- return TCreateObjectTransformer("CREATE OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
+ return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE"
+ ? TCreateExternalDataSourceTransformer("CREATE EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx)
+ : TCreateObjectTransformer("CREATE OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
}
if (auto kiObject = TMaybeNode<TKiAlterObject>(input)) {
- return TAlterObjectTransformer("ALTER OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
+ return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE"
+ ? TAlterExternalDataSourceTransformer("ALTER EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx)
+ : TAlterObjectTransformer("ALTER OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
}
if (auto kiObject = TMaybeNode<TKiDropObject>(input)) {
- return TDropObjectTransformer("DROP OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
+ return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE"
+ ? TDropExternalDataSourceTransformer("DROP EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx)
+ : TDropObjectTransformer("DROP OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx);
}
if (auto maybeCreateGroup = TMaybeNode<TKiCreateGroup>(input)) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 62e117c6a44..bb7aa9ce1de 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -335,10 +335,10 @@ void TestDropExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikim
response.Issues().PrintTo(Cerr);
UNIT_ASSERT(response.Success());
- auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
- const auto& externalTable = externalTableDesc->ResultSet.at(0);
- UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1);
- UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
+ auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalDataSourceDesc->ErrorCount, 1);
+ UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
}
} // namespace
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 64b117034f1..003d2d903b5 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -3595,6 +3595,147 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
session.Close().GetValueSync();
}
+
+ Y_UNIT_TEST(CreateExternalDataSource) {
+ TKikimrRunner kikimr;
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+ auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource);
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo);
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), "");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "my-bucket");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(externalDataSourceName).back());
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone());
+ }
+
+ Y_UNIT_TEST(DisableCreateExternalDataSource) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::INTERNAL_ERROR);
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External data sources are disabled. Please contact your system administrator to enable it");
+ }
+
+ Y_UNIT_TEST(CreateExternalDataSourceValidation) {
+ TKikimrRunner kikimr;
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="UNKNOWN"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Authorization method not specified");
+ }
+
+ Y_UNIT_TEST(DropExternalDataSource) {
+ TKikimrRunner kikimr;
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ {
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = TStringBuilder() << R"( DROP EXTERNAL DATA SOURCE `)" << externalDataSourceName << "`";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+ auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalDataSourceDesc->ErrorCount, 1);
+ UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
+ }
+
+ Y_UNIT_TEST(DisableDropExternalDataSource) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ auto query = TStringBuilder() << R"( DROP EXTERNAL DATA SOURCE `)" << externalDataSourceName << "`";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External data sources are disabled. Please contact your system administrator to enable it");
+ }
+
+ Y_UNIT_TEST(DoubleCreateExternalDataSource) {
+ TKikimrRunner kikimr;
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ {
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+ auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource);
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo);
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), "");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "my-bucket");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(externalDataSourceName).back());
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone());
+ }
+
+ {
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Check failed: path: '/Root/ExternalDataSource', error: path exist");
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 05bd98b896b..5fb6fe42db8 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -769,6 +769,7 @@ message TFeatureFlags {
optional bool EnableDataShardVolatileTransactions = 85 [default = false];
optional bool EnableTopicServiceTx = 86 [default = false];
optional bool EnableLLVMCache = 87 [default = false];
+ optional bool EnableExternalDataSources = 88 [default = false];
}
diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in
index 21ffa80530b..124c48827f9 100644
--- a/ydb/library/yql/sql/v1/SQLv1.g.in
+++ b/ydb/library/yql/sql/v1/SQLv1.g.in
@@ -47,6 +47,8 @@ sql_stmt_core:
| create_object_stmt
| alter_object_stmt
| drop_object_stmt
+ | create_external_data_source_stmt
+ | drop_external_data_source_stmt
;
expr:
@@ -453,6 +455,12 @@ values_source_row: LPAREN expr_list RPAREN;
simple_values_source: expr_list | select_stmt;
+create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE object_ref
+ with_table_settings
+;
+
+drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE object_ref;
+
create_object_stmt: CREATE OBJECT object_ref
LPAREN TYPE object_type_ref RPAREN
create_object_features?
@@ -917,6 +925,7 @@ keyword_compat: (
| CREATE
| CROSS
| CURRENT
+ | DATA
| DATABASE
| DECIMAL
| DECLARE
@@ -1022,6 +1031,7 @@ keyword_compat: (
| SAVEPOINT
| SEMI
| SETS
+ | SOURCE
| SUBQUERY
| SYMBOLS
| SYNC
@@ -1195,6 +1205,7 @@ CURRENT: C U R R E N T;
CURRENT_TIME: C U R R E N T '_' T I M E;
CURRENT_DATE: C U R R E N T '_' D A T E;
CURRENT_TIMESTAMP: C U R R E N T '_' T I M E S T A M P;
+DATA: D A T A;
DATABASE: D A T A B A S E;
DECIMAL: D E C I M A L;
DECLARE: D E C L A R E;
@@ -1336,6 +1347,7 @@ SELECT: S E L E C T;
SEMI: S E M I;
SET: S E T;
SETS: S E T S;
+SOURCE: S O U R C E;
STREAM: S T R E A M;
STRUCT: S T R U C T;
SUBQUERY: S U B Q U E R Y;
diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp
index fbe07af20d7..eabca3cec70 100644
--- a/ydb/library/yql/sql/v1/format/sql_format.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format.cpp
@@ -988,6 +988,18 @@ private:
VisitAllFields(TRule_drop_object_stmt::GetDescriptor(), msg);
}
+ void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) {
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitDropExternalDataSource(const TRule_drop_external_data_source_stmt& msg) {
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg);
+ }
+
void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) {
for (int i = 0; i < descr->field_count(); ++i) {
const NProtoBuf::FieldDescriptor* fd = descr->field(i);
@@ -1845,6 +1857,8 @@ TStaticData::TStaticData()
{TRule_create_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateObject)},
{TRule_alter_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitAlterObject)},
{TRule_drop_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropObject)},
+ {TRule_create_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateExternalDataSource)},
+ {TRule_drop_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropExternalDataSource)},
})
{
// ensure that all statements has a visitor
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index 53e82aa2ea5..2866b0d4528 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -273,6 +273,18 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
setup.Run(cases);
}
+ Y_UNIT_TEST(ExternalDataSourceOperations) {
+ TCases cases = {
+ {"creAte exTernAl daTa SouRce usEr With (a = \"b\")",
+ "CREATE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"},
+ {"dRop exTerNal Data SouRce usEr",
+ "DROP EXTERNAL DATA SOURCE usEr;\n"},
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+ }
+
Y_UNIT_TEST(TypeSelection) {
TCases cases = {
{"Select tYpe.* frOm Table tYpe",
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index d1fba615c2c..8fd084af800 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -28,6 +28,7 @@
#include <library/cpp/json/json_reader.h>
#include <ydb/library/yql/utils/utf8.h>
+#include <ydb/library/yql/utils/yql_paths.h>
#include <google/protobuf/repeated_field.h>
@@ -818,6 +819,7 @@ protected:
bool alter, bool reset);
bool StoreTableSettingsEntry(const TIdentifier& id, const TRule_table_setting_value& value, TTableSettings& settings,
bool alter = false);
+ bool StoreDataSourceSettingsEntry(const TIdentifier& id, const TRule_table_setting_value* value, std::map<TString, TDeferredAtom>& result);
bool ResetTableSettingsEntry(const TIdentifier& id, TTableSettings& settings);
TNodePtr TypeSimple(const TRule_type_name_simple& node, bool onlyDataAllowed);
@@ -849,6 +851,7 @@ protected:
bool BindParameterClause(const TRule_bind_parameter& node, TDeferredAtom& result);
bool ObjectFeatureValueClause(const TRule_object_feature_value & node, TDeferredAtom & result);
bool ParseObjectFeatures(std::map<TString, TDeferredAtom> & result, const TRule_object_features & features);
+ bool ParseExternalDataSourceSettings(std::map<TString, TDeferredAtom> & result, const TRule_with_table_settings & settings);
bool RoleNameClause(const TRule_role_name& node, TDeferredAtom& result, bool allowSystemRoles);
bool RoleParameters(const TRule_create_user_option& node, TRoleParameters& result) ;
private:
@@ -2048,6 +2051,26 @@ namespace {
return true;
}
+ bool StoreString(const TRule_table_setting_value& from, TDeferredAtom& to, TContext& ctx, const TString& errorPrefix = {}) {
+ switch (from.Alt_case()) {
+ case TRule_table_setting_value::kAltTableSettingValue2: {
+ // STRING_VALUE
+ const TString stringValue(ctx.Token(from.GetAlt_table_setting_value2().GetToken1()));
+ auto unescaped = StringContent(ctx, ctx.Pos(), stringValue);
+ if (!unescaped) {
+ ctx.Error() << errorPrefix << " value cannot be unescaped";
+ return false;
+ }
+ to = TDeferredAtom(ctx.Pos(), unescaped->Content);
+ break;
+ }
+ default:
+ ctx.Error() << errorPrefix << " value should be a string literal";
+ return false;
+ }
+ return true;
+ }
+
bool StoreInt(const TRule_table_setting_value& from, TNodePtr& to, TContext& ctx) {
switch (from.Alt_case()) {
case TRule_table_setting_value::kAltTableSettingValue3: {
@@ -9191,7 +9214,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
case TRule_sql_stmt_core::kAltSqlStmtCore27:
{
- // create_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]);
+ // alter_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]);
auto& node = core.GetAlt_sql_stmt_core27().GetRule_alter_object_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
if (node.GetRule_object_ref3().HasBlock1()) {
@@ -9213,7 +9236,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
case TRule_sql_stmt_core::kAltSqlStmtCore28:
{
- // create_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]);
+ // drop_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]);
auto& node = core.GetAlt_sql_stmt_core28().GetRule_drop_object_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
if (node.GetRule_object_ref3().HasBlock1()) {
@@ -9235,6 +9258,43 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore29:
+ {
+ // create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE name WITH (k=v,...);
+ auto& node = core.GetAlt_sql_stmt_core29().GetRule_create_external_data_source_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref5().HasBlock1()) {
+ if (!ClusterExpr(node.GetRule_object_ref5().GetBlock1().GetRule_cluster_expr1(),
+ false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ const TString& objectId = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second;
+ std::map<TString, TDeferredAtom> kv;
+ if (!ParseExternalDataSourceSettings(kv, node.GetRule_with_table_settings6())) {
+ return false;
+ }
+
+ AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", std::move(kv), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore30:
+ {
+ // drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE name;
+ auto& node = core.GetAlt_sql_stmt_core30().GetRule_drop_external_data_source_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref5().HasBlock1()) {
+ if (!ClusterExpr(node.GetRule_object_ref5().GetBlock1().GetRule_cluster_expr1(),
+ false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ const TString& objectId = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", {}, context));
+ break;
+ }
default:
Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);
@@ -10799,4 +10859,52 @@ bool TSqlTranslation::ParseObjectFeatures(std::map<TString, TDeferredAtom>& resu
return true;
}
+bool TSqlTranslation::StoreDataSourceSettingsEntry(const TIdentifier& id, const TRule_table_setting_value* value, std::map<TString, TDeferredAtom>& result) {
+ YQL_ENSURE(value);
+
+ const TString key = to_lower(id.Name);
+ if (result.find(key) != result.end()) {
+ Ctx.Error() << to_upper(key) << " duplicate keys";
+ return false;
+ }
+
+ if (IsIn({"source_type", "installation", "location", "auth_method"}, key)) {
+ if (!StoreString(*value, result[key], Ctx, to_upper(key))) {
+ return false;
+ }
+ } else {
+ Ctx.Error() << "Unknown external data source setting: " << id.Name;
+ return false;
+ }
+ return true;
+}
+
+bool TSqlTranslation::ParseExternalDataSourceSettings(std::map<TString, TDeferredAtom>& result, const TRule_with_table_settings& settingsNode) {
+ const auto& firstEntry = settingsNode.GetRule_table_settings_entry3();
+ if (!StoreDataSourceSettingsEntry(IdEx(firstEntry.GetRule_an_id1(), *this), &firstEntry.GetRule_table_setting_value3(),
+ result)) {
+ return false;
+ }
+ for (auto& block : settingsNode.GetBlock4()) {
+ const auto& entry = block.GetRule_table_settings_entry2();
+ if (!StoreDataSourceSettingsEntry(IdEx(entry.GetRule_an_id1(), *this), &entry.GetRule_table_setting_value3(), result)) {
+ return false;
+ }
+ }
+
+ if (result.find("source_type") == result.end()) {
+ Ctx.Error() << "SOURCE_TYPE requires key";
+ return false;
+ }
+ if (result.find("auth_method") == result.end()) {
+ Ctx.Error() << "AUTH_METHOD requires key";
+ return false;
+ }
+ if (result.find("installation") == result.end() && result.find("location") == result.end()) {
+ Ctx.Error() << "INSTALLATION or LOCATION must be specified";
+ return false;
+ }
+ return true;
+}
+
} // namespace NSQLTranslationV1
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index ce96a14d2d5..e6fe98f3d9c 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -751,7 +751,6 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
- Cerr << line << Endl;
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("'features"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("'\"OVERRIDE\""));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject"));
@@ -4703,4 +4702,137 @@ Y_UNIT_TEST_SUITE(ExternalDeclares) {
UNIT_ASSERT_NO_DIFF(Err2Str(res),
"<main>:1:15: Error: Selecting data from monitoring source is not supported\n");
}
+
+ Y_UNIT_TEST(CreateExternalDataSource) {
+ NYql::TAstParseResult res = SqlToYql(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );
+ )");
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"NONE") '('"location" '"my-bucket") '('"source_type" '"ObjectStorage"))#");
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(CreateExternalDataSourceWithTablePrefix) {
+ NYql::TAstParseResult res = SqlToYql(R"(
+ USE plato;
+ pragma TablePathPrefix='/aba';
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );
+ )");
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyDataSource");
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(CreateExternalDataSourceWithBadArguments) {
+ ExpectFailWithError(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource;
+ )" , "<main>:3:56: Error: Unexpected token ';' : syntax error...\n\n");
+
+ ExpectFailWithError(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );
+ )" , "<main>:5:33: Error: SOURCE_TYPE requires key\n");
+
+ ExpectFailWithError(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ SOURCE_TYPE="ObjectStorage",
+ AUTH_METHOD="NONE"
+ );
+ )" , "<main>:5:33: Error: INSTALLATION or LOCATION must be specified\n");
+
+
+ ExpectFailWithError(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket"
+ );
+ )" , "<main>:5:30: Error: AUTH_METHOD requires key\n");
+
+ ExpectFailWithError(R"(
+ USE plato;
+ CREATE EXTERNAL DATA SOURCE MyDataSource WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE",
+ OTHER="VALUE"
+ );
+ )" , "<main>:7:21: Error: Unknown external data source setting: OTHER\n");
+ }
+
+ Y_UNIT_TEST(DropExternalDataSourceWithTablePrefix) {
+ NYql::TAstParseResult res = SqlToYql(R"(
+ USE plato;
+ DROP EXTERNAL DATA SOURCE MyDataSource;
+ )");
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0}};
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(DropExternalDataSource) {
+ NYql::TAstParseResult res = SqlToYql(R"(
+ USE plato;
+ pragma TablePathPrefix='/aba';
+ DROP EXTERNAL DATA SOURCE MyDataSource;
+ )");
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyDataSource");
+ UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0}};
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
}