aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-08-22 16:59:45 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-08-22 17:36:02 +0300
commitd40726ba705519837b014cb71110b86258906f0d (patch)
tree496a6cdf0860dce49d9f343ea5f34a0952a138bc
parent3af30f8d6f0ef3a03c4f64174e2296d97043c958 (diff)
downloadydb-d40726ba705519837b014cb71110b86258906f0d.tar.gz
KIKIMR-18756: ExternalTable/ExternalSource version check
-rw-r--r--ydb/core/kqp/common/kqp_resolve.h6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp6
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp14
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h25
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.cpp9
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp159
-rw-r--r--ydb/core/protos/kqp_physical.proto1
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp3
10 files changed, 211 insertions, 15 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h
index aca17f50a7b..0c9553e89d9 100644
--- a/ydb/core/kqp/common/kqp_resolve.h
+++ b/ydb/core/kqp/common/kqp_resolve.h
@@ -19,7 +19,8 @@ enum class ETableKind {
Unknown = 0,
Datashard,
SysView,
- Olap
+ Olap,
+ External
};
class TKqpTableKeys {
@@ -99,6 +100,9 @@ public:
case NKqpProto::TABLE_KIND_SYS_VIEW:
TableKind = ETableKind::SysView;
break;
+ case NKqpProto::TABLE_KIND_EXTERNAL:
+ TableKind = ETableKind::External;
+ return;
default:
YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind());
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 735da43d1fa..219cfaeca6e 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -854,6 +854,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
switch (tableInfo->TableKind) {
case ETableKind::Unknown:
+ case ETableKind::External:
case ETableKind::SysView: {
protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
break;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index 4425e5ca4e7..63bf0fcc28d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -518,7 +518,7 @@ public:
YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
return nullptr;
}
-
+
ctx.Step.Repeat(TExprStep::DiscoveryIO)
.Repeat(TExprStep::Epochs)
.Repeat(TExprStep::Intents)
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 3ae9aa60946..fb93b5ff730 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -59,6 +59,7 @@ TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ct
items.emplace_back(ctx.NewList(pos, std::move(children)));
}
+
return ctx.NewList(pos, std::move(items));
}
@@ -689,7 +690,7 @@ public:
.Repeat(TExprStep::RewriteIO);
TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) });
auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path});
- auto key = ctx.NewCallable(node->Pos(), "Key", {table});
+ auto newKey = ctx.NewCallable(node->Pos(), "Key", {table});
auto newRead = Build<TCoRead>(ctx, node->Pos())
.World(read->Child(0))
.DataSource(
@@ -701,9 +702,10 @@ public:
.Done().Ptr()
)
.FreeArgs()
- .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key}))
+ .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {newKey}))
.Add(ctx.NewCallable(node->Pos(), "Void", {}))
.Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent))
+
.Build()
.Done().Ptr();
auto retChildren = node->ChildrenList();
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index b7da08f0106..84f448d36f5 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -107,6 +107,8 @@ NKqpProto::EKqpPhyTableKind GetPhyTableKind(EKikimrTableKind kind) {
return NKqpProto::TABLE_KIND_OLAP;
case EKikimrTableKind::SysView:
return NKqpProto::TABLE_KIND_SYS_VIEW;
+ case EKikimrTableKind::External:
+ return NKqpProto::TABLE_KIND_EXTERNAL;
default:
return NKqpProto::TABLE_KIND_UNSPECIFIED;
}
@@ -786,6 +788,18 @@ private:
FillTable(*tableMeta, std::move(tableColumns), *txProto.AddTables());
}
+ for (const auto& [a, desc] : TablesData->GetTables()) {
+ auto tableMeta = desc.Metadata;
+ YQL_ENSURE(tableMeta);
+ if (desc.Metadata->Kind == NYql::EKikimrTableKind::External) {
+ THashSet<TStringBuf> columns;
+ for (const auto& [col, _]: tableMeta->Columns){
+ columns.emplace(col);
+ }
+ FillTable(*tableMeta, std::move(columns), *txProto.AddTables());
+ }
+ }
+
for (const auto& secretName : SecretNames) {
txProto.AddSecretNames(secretName);
}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index f2013bb807e..92319c8971f 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -178,17 +178,16 @@ public:
// todo: gvit
// fill this hash set only once on query compilation.
void FillTables(const NKqpProto::TKqpPhyTx& phyTx) {
+ auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) {
+ NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId());
+ auto it = TableVersions.find(tableId);
+ if (it != TableVersions.end()) {
+ Y_ENSURE(it->second == table.GetVersion());
+ } else {
+ TableVersions.emplace(tableId, table.GetVersion());
+ }
+ };
for (const auto& stage : phyTx.GetStages()) {
-
- auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) {
- NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId());
- auto it = TableVersions.find(tableId);
- if (it != TableVersions.end()) {
- Y_ENSURE(it->second == table.GetVersion());
- } else {
- TableVersions.emplace(tableId, table.GetVersion());
- }
- };
for (const auto& tableOp : stage.GetTableOps()) {
addTable(tableOp.GetTable());
}
@@ -209,6 +208,12 @@ public:
}
}
}
+
+ for (const auto& table : phyTx.GetTables()) {
+ if (table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_EXTERNAL) {
+ addTable(table.GetId());
+ }
+ }
}
bool NeedCheckTableVersions() const {
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
index ed0dcaaa078..901fed55c01 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
@@ -178,6 +178,15 @@ bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query) {
if (tx.GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
return false;
}
+
+ for (const auto& stage : tx.GetStages()) {
+ for (const auto& source : stage.GetSources()) {
+ // S3 provider stores S3 paths to read in AST, so we can't cache such queries
+ if (source.HasExternalSource() && source.GetExternalSource().GetType() == "S3Source") {
+ return false;
+ }
+ }
+ }
}
return true;
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 42608e7fa3c..93e644cba81 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -12,6 +12,7 @@
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
#include <fmt/format.h>
@@ -313,6 +314,63 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
}
}
+ Y_UNIT_TEST(ExecuteScriptWithS3ReadNotCached) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString externalTableName = "/Root/test_binding_resolve";
+ const TString bucket = "test_bucket1";
+ const TString object = "test_object";
+
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{external_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="{object}",
+ FORMAT="json_each_row"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
+ "location"_a = GetBucketLocation(bucket),
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto settings = TExecuteScriptSettings().StatsMode(Ydb::Query::STATS_MODE_BASIC);
+
+ const TString sql = fmt::format(R"(
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a=externalTableName);
+
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
+
+ scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync();
+ readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
+ }
+
Y_UNIT_TEST(ExecuteScriptWithDataSource) {
using namespace fmt::literals;
const TString externalDataSourceName = "/Root/external_data_source";
@@ -830,6 +888,107 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n");
}
+ void ExecuteInsertQuery(TQueryClient& client, const TString& writeTableName, const TString& readTableName, bool expectCached) {
+ using namespace fmt::literals;
+ const TString sql = fmt::format(R"(
+ INSERT INTO `{write_table}`
+ SELECT * FROM `{read_table}`;
+ )",
+ "write_table"_a = writeTableName,
+ "read_table"_a = readTableName);
+ auto settings = TExecuteQuerySettings().StatsMode(EStatsMode::Basic);
+ auto resultFuture = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings);
+ resultFuture.Wait();
+ UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
+ auto& stats = NYdb::TProtoAccessor::GetProto(*resultFuture.GetValueSync().GetStats());
+ UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached);
+ }
+
+ Y_UNIT_TEST(InsertIntoBucketCaching) {
+ using namespace fmt::literals;
+ const TString writeDataSourceName = "/Root/write_data_source";
+ const TString writeTableName = "/Root/write_binding";
+ const TString writeBucket = "test_bucket_cache";
+ const TString writeObject = "test_object_write/";
+ const TString writeAnotherObject = "test_another_object_write/";
+ const TString readTableName = "/Root/read_table";
+ {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+ CreateBucket(writeBucket, s3Client);
+ }
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ {
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{write_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{write_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{write_source}",
+ LOCATION="{write_object}",
+ FORMAT="tsv_with_names"
+ );
+
+ CREATE TABLE `{read_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL,
+ PRIMARY KEY (key)
+ );
+ )",
+ "write_source"_a = writeDataSourceName,
+ "write_table"_a = writeTableName,
+ "write_location"_a = GetBucketLocation(writeBucket),
+ "write_object"_a = writeObject,
+ "read_table"_a = readTableName);
+
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const TString query = fmt::format(R"(
+ REPLACE INTO `{read_table}` (key, value) VALUES
+ ("1", "one")
+ )",
+ "read_table"_a = readTableName);
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ auto db = kikimr.GetQueryClient();
+ ExecuteInsertQuery(db, writeTableName, readTableName, false);
+ ExecuteInsertQuery(db, writeTableName, readTableName, true);
+ {
+ const TString modifyQuery = fmt::format(R"(
+ DROP EXTERNAL TABLE `{write_table}`;
+ CREATE EXTERNAL TABLE `{write_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{write_source}",
+ LOCATION="{write_object}",
+ FORMAT="tsv_with_names"
+ );
+ )",
+ "write_table"_a = writeTableName,
+ "write_object"_a = writeAnotherObject,
+ "write_source"_a = writeDataSourceName);
+ auto result = session.ExecuteSchemeQuery(modifyQuery).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ ExecuteInsertQuery(db, writeTableName, readTableName, false);
+
+ UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3);
+ }
+
Y_UNIT_TEST(JoinTwoSources) {
using namespace fmt::literals;
const TString dataSource = "/Root/data_source";
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index fd208e72a30..3f44bc24818 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -74,6 +74,7 @@ enum EKqpPhyTableKind {
TABLE_KIND_DS = 1;
TABLE_KIND_OLAP = 2;
TABLE_KIND_SYS_VIEW = 3;
+ TABLE_KIND_EXTERNAL = 4;
}
message TKqpPhyTableId {
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 77e76a54a2d..a88e2c3cbd1 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -1716,7 +1716,8 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
return SetError(context, entry, TNavigate::EStatus::PathErrorUnknown);
}
- const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable;
+ const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable ||
+ Kind == TNavigate::KindExternalTable || Kind == TNavigate::KindExternalDataSource;
const bool isTopic = Kind == TNavigate::KindTopic || Kind == TNavigate::KindCdcStream;
if (entry.Operation == TNavigate::OpTable && !isTable) {