aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@ydb.tech>2024-01-29 09:41:41 +0200
committerGitHub <noreply@github.com>2024-01-29 09:41:41 +0200
commita32284c27a7379de8cfb724ce429f89d5e5d2cb6 (patch)
tree77581d1c5036a4c48882ea3703b83dde8b272cf4
parent5450e7b6cb4d07b476fdfa66a26b97546a16de6e (diff)
downloadydb-a32284c27a7379de8cfb724ce429f89d5e5d2cb6.tar.gz
Forbid writing/reading directly to external data source (#1204)
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp7
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp60
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.h13
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp2
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp250
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp2
12 files changed, 318 insertions, 36 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index f02e4abc9d..283c6a2cde 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -176,7 +176,7 @@ private:
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
- TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
+ QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 4caa5e09f5..1b68a5cf1a 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -806,12 +806,7 @@ public:
return InvalidCluster<TTableMetadataResult>(cluster);
}
- settings.WithExternalDatasources_ = !CheckCluster(cluster);
- // In the case of reading from an external data source,
- // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
- // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
- // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
- return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken);
+ return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken);
} catch (yexception& e) {
return MakeFuture(ResultFromException<TTableMetadataResult>(e));
}
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index e368f3f40a..58a942d412 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -227,6 +227,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
tableMeta->SchemaVersion = description.GetVersion();
tableMeta->Kind = NYql::EKikimrTableKind::External;
+ tableMeta->TableType = NYql::ETableType::ExternalTable;
tableMeta->Attributes = entry.Attributes;
@@ -253,7 +254,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
}
TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
- const TString& cluster, const TString& tableName) {
+ const TString& cluster, const TString& mainCluster, const TString& tableName) {
const auto& description = entry.ExternalDataSourceInfo->Description;
TTableMetadataResult result;
result.SetSuccess();
@@ -263,6 +264,11 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
tableMeta->SchemaVersion = description.GetVersion();
tableMeta->Kind = NYql::EKikimrTableKind::External;
+ if (cluster == mainCluster) { // resolved external data source itself
+ tableMeta->TableType = NYql::ETableType::Unknown;
+ } else {
+ tableMeta->TableType = NYql::ETableType::Table; // wanted to resolve table in external data source
+ }
tableMeta->Attributes = entry.Attributes;
@@ -300,7 +306,7 @@ TTableMetadataResult GetViewMetadataResult(
}
TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
- const TString& cluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
+ const TString& cluster, const TString& mainCluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
using TResult = NYql::IKikimrGateway::TTableMetadataResult;
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
@@ -339,7 +345,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
result = GetExternalTableMetadataResult(entry, cluster, tableName);
break;
case EKind::KindExternalDataSource:
- result = GetExternalDataSourceMetadataResult(entry, cluster, tableName);
+ result = GetExternalDataSourceMetadataResult(entry, cluster, mainCluster, tableName);
break;
case EKind::KindView:
result = GetViewMetadataResult(entry, cluster, tableName);
@@ -697,16 +703,30 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
- const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
- Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
- auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id,
+ // In the case of reading from an external data source,
+ // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
+ // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
+ // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
+ const bool resolveEntityInsideDataSource = (cluster != Cluster);
+ TPath entityName = id;
+ if constexpr (std::is_same_v<TPath, TString>) {
+ if (resolveEntityInsideDataSource) {
+ entityName = cluster;
+ }
+ } else {
+ Y_ENSURE(!resolveEntityInsideDataSource);
+ }
+
+ const auto externalEntryItem = CreateNavigateExternalEntry(entityName, resolveEntityInsideDataSource);
+ Y_ABORT_UNLESS(!resolveEntityInsideDataSource || externalEntryItem, "External data source must be resolved using path only");
+ auto resNavigate = resolveEntityInsideDataSource ? *externalEntryItem : CreateNavigateEntry(entityName,
settings, TempTablesState);
const auto entry = resNavigate.Entry;
const auto queryName = resNavigate.QueryName;
- const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem;
- const ui64 expectedSchemaVersion = GetExpectedVersion(id);
+ const auto externalEntry = resolveEntityInsideDataSource ? std::optional<NavigateEntryResult>{} : externalEntryItem;
+ const ui64 expectedSchemaVersion = GetExpectedVersion(entityName);
- LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id));
+ LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(entityName));
auto navigate = MakeHolder<TNavigate>();
navigate->ResultSet.emplace_back(entry);
@@ -728,7 +748,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
ActorSystem,
schemeCacheId,
ev.Release(),
- [userToken, database, cluster, table, settings, expectedSchemaVersion, this, queryName]
+ [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName]
(TPromise<TResult> promise, TResponse&& response) mutable
{
try {
@@ -739,7 +759,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto& entry = InferEntry(navigate.ResultSet);
if (entry.Status != EStatus::Ok) {
- promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
+ promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table));
return;
}
@@ -759,9 +779,15 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
}
}
+ const bool resolveEntityInsideDataSource = (cluster != Cluster);
+ // resolveEntityInsideDataSource => entry.Kind == EKind::KindExternalDataSource
+ if (resolveEntityInsideDataSource && entry.Kind != EKind::KindExternalDataSource) {
+ throw yexception() << "\"" << CombinePath(entry.Path.begin(), entry.Path.end()) << "\" is expected to be external data source";
+ }
+
switch (entry.Kind) {
case EKind::KindExternalDataSource: {
- auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, table);
+ auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalDataSourceMetadata.Success()) {
promise.SetValue(externalDataSourceMetadata);
return;
@@ -772,12 +798,12 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
promise.SetValue(externalDataSourceMetadata);
});
+ break;
}
- break;
case EKind::KindExternalTable: {
YQL_ENSURE(entry.ExternalTableInfo, "expected external table info");
const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath();
- auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table);
+ auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalTableMetadata.Success()) {
promise.SetValue(externalTableMetadata);
return;
@@ -789,8 +815,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto externalDataSourceMetadata = result.GetValue();
promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata));
});
+ break;
}
- break;
case EKind::KindIndex: {
Y_ENSURE(entry.ListNodeEntry, "expected children list");
Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child");
@@ -805,10 +831,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
{
promise.SetValue(result.GetValue());
});
+ break;
}
- break;
default: {
- promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table, queryName));
+ promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table, queryName));
}
}
}
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h
index 33340ab8ba..798e9cca88 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.h
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h
@@ -14,17 +14,19 @@ namespace NKikimr::NKqp {
class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader {
public:
- explicit TKqpTableMetadataLoader(TActorSystem* actorSystem,
- NYql::TKikimrConfiguration::TPtr config,
- bool needCollectSchemeData = false,
+ explicit TKqpTableMetadataLoader(const TString& cluster,
+ TActorSystem* actorSystem,
+ NYql::TKikimrConfiguration::TPtr config,
+ bool needCollectSchemeData = false,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20))
- : NeedCollectSchemeData(needCollectSchemeData)
+ : Cluster(cluster)
+ , NeedCollectSchemeData(needCollectSchemeData)
, ActorSystem(actorSystem)
, Config(config)
, TempTablesState(std::move(tempTablesState))
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
- {};
+ {}
NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata(
const TString& cluster, const TString& table, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, const TString& database,
@@ -56,6 +58,7 @@ private:
void OnLoadedTableMetadata(NYql::IKikimrGateway::TTableMetadataResult& loadTableMetadataResult);
+ const TString Cluster;
TVector<NKikimrKqp::TKqpTableMetadataProto> CollectedSchemeData;
TMutex Lock;
bool NeedCollectSchemeData;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index 393862f3d7..e2df475c14 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -557,6 +557,11 @@ public:
return true;
}
+ if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\" without table. Please specify table to write to"));
+ return false;
+ }
+
if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) {
YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
return true;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 810057fe28..4e45f0a218 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -256,7 +256,7 @@ public:
}
}
break;
- default:
+ default:
break;
}
*result = value;
@@ -679,6 +679,11 @@ public:
auto& tableDesc = SessionCtx->Tables().GetTable(cluster, tablePath);
if (key.GetKeyType() == TKikimrKey::Type::Table) {
if (tableDesc.Metadata->Kind == EKikimrTableKind::External) {
+ if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
+ ctx.AddError(TIssue(node->Pos(ctx),
+ TStringBuilder() << "Attempt to read from external data source \"" << tablePath << "\" without table. Please specify table to read from"));
+ return nullptr;
+ }
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
ctx.Step.Repeat(TExprStep::DiscoveryIO)
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index bd58a9c7e7..50c1de4dde 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -923,7 +923,7 @@ public:
auto tableTypeItem = table.Metadata->TableType;
if (tableTypeItem == ETableType::ExternalTable && !SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()),
- TStringBuilder() << "External table are disabled. Please contact your system administrator to enable it"));
+ TStringBuilder() << "External tables are disabled. Please contact your system administrator to enable it"));
return SyncError();
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index f55ac55afd..a072b09923 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -73,7 +73,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters);
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
- std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
+ std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig());
}
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 956ba602d4..f2d7b7aba9 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -181,7 +181,7 @@ public:
QueryState->RequestEv.reset(ev->Release().Release());
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
- TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
+ Settings.Cluster, TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig);
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 74ab9977ab..69d72ee6e0 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -723,7 +723,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached);
}
- Y_UNIT_TEST(InsertIntoBucketCaching) {
+ Y_UNIT_TEST(InsertIntoBucketCaching) {
const TString writeDataSourceName = "/Root/write_data_source";
const TString writeTableName = "/Root/write_binding";
const TString writeBucket = "test_bucket_cache";
@@ -1343,6 +1343,254 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString());
UNIT_ASSERT(scriptResult.GetPlan());
}
+
+ Y_UNIT_TEST(ReadFromDataSourceWithoutTable) {
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString bucket = "test_bucket_inline_desc";
+ const TString object = "test_object_inline_desc";
+
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"sql(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)sql",
+ "external_source"_a = externalDataSourceName,
+ "location"_a = GetBucketLocation(bucket),
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ {
+ const TString sql = fmt::format(R"sql(
+ SELECT * FROM `{external_data_source}`;
+ )sql",
+ "external_data_source"_a=externalDataSourceName);
+
+ auto db = kikimr->GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed);
+ UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Attempt to read from external data source");
+ }
+
+ // select using inline syntax is well
+ {
+ const TString sql = fmt::format(R"sql(
+ SELECT * FROM `{external_data_source}`.`{obj_path}`
+ WITH (
+ SCHEMA = (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ),
+ FORMAT = "json_each_row"
+ )
+ )sql",
+ "external_data_source"_a=externalDataSourceName,
+ "obj_path"_a = object);
+
+ auto db = kikimr->GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ }
+ }
+
+ Y_UNIT_TEST(InsertIntoDataSourceWithoutTable) {
+ const TString readDataSourceName = "/Root/read_data_source";
+ const TString readTableName = "/Root/read_binding";
+ const TString readBucket = "test_bucket_read_insert_into_data_source";
+ const TString readObject = "test_object_read";
+ const TString writeDataSourceName = "/Root/write_data_source";
+ const TString writeTableName = "/Root/write_binding";
+ const TString writeBucket = "test_bucket_write_insert_into_data_source";
+ const TString writeObject = "test_object_write/";
+
+ {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+ CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client);
+ CreateBucket(writeBucket, s3Client);
+ }
+
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"sql(
+ CREATE EXTERNAL DATA SOURCE `{read_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{read_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{read_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{read_source}",
+ LOCATION="{read_object}",
+ FORMAT="json_each_row"
+ );
+
+ CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{write_location}",
+ AUTH_METHOD="NONE"
+ );
+ )sql",
+ "read_source"_a = readDataSourceName,
+ "read_table"_a = readTableName,
+ "read_location"_a = GetBucketLocation(readBucket),
+ "read_object"_a = readObject,
+ "write_source"_a = writeDataSourceName,
+ "write_table"_a = writeTableName,
+ "write_location"_a = GetBucketLocation(writeBucket)
+ );
+ auto schemeResult = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString());
+
+ {
+ const TString sql = fmt::format(R"sql(
+ INSERT INTO `{write_source}`
+ SELECT * FROM `{read_table}`
+ )sql",
+ "read_table"_a=readTableName,
+ "write_source"_a = writeDataSourceName);
+
+ auto db = kikimr->GetQueryClient();
+ auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
+ UNIT_ASSERT_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, static_cast<int>(result.GetStatus()) << ", " << result.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Attempt to write to external data source");
+ }
+
+ // insert with inline syntax is well
+ {
+ Cerr << "Run inplace insert" << Endl;
+ const TString sql = fmt::format(R"sql(
+ INSERT INTO `{write_source}`.`{write_object}`
+ WITH (
+ SCHEMA = (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ),
+ FORMAT = "json_each_row"
+ )
+ SELECT * FROM `{read_table}`
+ )sql",
+ "read_table"_a=readTableName,
+ "write_source"_a = writeDataSourceName,
+ "write_object"_a = writeObject);
+
+ auto db = kikimr->GetQueryClient();
+ auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(SpecifyExternalTableInsteadOfExternalDataSource) {
+ const TString externalDataSourceName = "external_data_source";
+ const TString externalTableName = "external_table";
+ const TString bucket = "test_bucket_specify_external_table";
+ const TString object = "test_object_specify_external_table";
+
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"sql(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+
+ CREATE EXTERNAL TABLE `{external_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="{object}",
+ FORMAT="json_each_row"
+ );
+ )sql",
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
+ "location"_a = GetBucketLocation(bucket),
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ {
+ const TString sql = fmt::format(R"sql(
+ SELECT * FROM `{external_table}`.`{object}`
+ WITH (
+ SCHEMA = (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ),
+ FORMAT = "json_each_row"
+ );
+ )sql",
+ "external_table"_a=externalTableName,
+ "object"_a = object);
+
+ auto db = kikimr->GetQueryClient();
+ auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL_C(queryExecutionOperation.GetStatus(), EStatus::SUCCESS, queryExecutionOperation.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source");
+ }
+
+ {
+ const TString sql = fmt::format(R"sql(
+ INSERT INTO `{external_table}`.`{object}`
+ WITH (
+ SCHEMA = (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ),
+ FORMAT = "json_each_row"
+ )
+ SELECT * FROM `{external_table}` WHERE key = '42';
+ )sql",
+ "external_table"_a=externalTableName,
+ "object"_a = object);
+
+ auto db = kikimr->GetQueryClient();
+ auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL_C(queryExecutionOperation.GetStatus(), EStatus::SUCCESS, queryExecutionOperation.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source");
+ }
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 3215bf2717..7e0fbc7c16 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -34,7 +34,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) {
auto counters = MakeIntrusive<TKqpRequestCounters>();
counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters);
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
- std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false);
+ std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false);
return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig());
}
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index aecd1b44ba..1f53b50a83 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -4882,7 +4882,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
- UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External table are disabled. Please contact your system administrator to enable it");
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External tables are disabled. Please contact your system administrator to enable it");
}
Y_UNIT_TEST(CreateExternalTableCheckPrimaryKey) {