aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-09-21 17:23:53 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-09-21 17:44:32 +0300
commit46510fe73a29904c64c8b51f5d182d4b160b68e9 (patch)
treefdfc90c28facde07bede0da76965861b2bd1475b
parent4dc08f464b9d71f6a75cf3ce9517fc733529e69b (diff)
downloadydb-46510fe73a29904c64c8b51f5d182d4b160b68e9.tar.gz
Allow forbidden callables in effects queries without YDB (S3-only)
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp33
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp117
2 files changed, 142 insertions, 8 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 3054575846a..a0238fca5aa 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -527,7 +527,7 @@ public:
return TStatus::Error;
}
- if (!CheckEffectsTx(tx.Cast(), ctx)) {
+ if (!CheckEffectsTx(tx.Cast(), query, ctx)) {
return TStatus::Error;
}
@@ -543,7 +543,34 @@ public:
}
private:
- bool CheckEffectsTx(TKqpPhysicalTx tx, TExprContext& ctx) const {
+ bool HasTableEffects(const TKqlQuery& query) const {
+ for (const TExprBase& effect : query.Effects()) {
+ if (auto maybeSinkEffect = effect.Maybe<TKqpSinkEffect>()) {
+ // (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0)
+ auto sinkEffect = maybeSinkEffect.Cast();
+ const size_t sinkIndex = FromString(TStringBuf(sinkEffect.SinkIndex()));
+ auto stageExpr = sinkEffect.Stage();
+ auto maybeStageBase = stageExpr.Maybe<TDqStageBase>();
+ YQL_ENSURE(maybeStageBase);
+ auto stage = maybeStageBase.Cast();
+ YQL_ENSURE(stage.Outputs());
+ auto outputs = stage.Outputs().Cast();
+ YQL_ENSURE(sinkIndex < outputs.Size());
+ auto maybeSink = outputs.Item(sinkIndex);
+ YQL_ENSURE(maybeSink.Maybe<TDqSink>());
+ auto sink = maybeSink.Cast<TDqSink>();
+ auto dataSink = TCoDataSink(sink.DataSink().Ptr());
+ if (dataSink.Category() == YdbProviderName || dataSink.Category() == KikimrProviderName) {
+ return true;
+ }
+ } else { // Not a SinkEffect, => a YDB table effect
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool CheckEffectsTx(TKqpPhysicalTx tx, const TKqlQuery& query, TExprContext& ctx) const {
TMaybeNode<TExprBase> blackistedNode;
VisitExpr(tx.Ptr(), [&blackistedNode](const TExprNode::TPtr& exprNode) {
if (blackistedNode) {
@@ -565,7 +592,7 @@ private:
return true;
});
- if (blackistedNode) {
+ if (blackistedNode && HasTableEffects(query)) {
ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder()
<< "Callable not expected in effects tx: " << blackistedNode.Cast<TCallable>().CallableName()));
return false;
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index b0468977916..bc48c8e10d1 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -922,7 +922,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"write_object"_a = writeObject,
"read_table"_a = readTableName);
-
+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
@@ -1004,7 +1004,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
resultFuture.Wait();
UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
-
+
UNIT_ASSERT_NO_DIFF(resultFuture.GetValueSync().GetIssues().ToString(), "<main>: Error: Pre type annotation, code: 1020\n"
" <main>:3:27: Error: Write mode 'update' is not supported for external entities\n");
}
@@ -1129,7 +1129,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
-
+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
@@ -1198,7 +1198,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
-
+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
@@ -1259,7 +1259,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
-
+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
@@ -1274,6 +1274,113 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.GetValue(0).GetProto().bytes_value(), content);
}
}
+
+ Y_UNIT_TEST(ForbiddenCallablesForYdbTables) {
+ using namespace fmt::literals;
+ const TString readDataSourceName = "/Root/read_data_source";
+ const TString readTableName = "/Root/read_table";
+ const TString readBucket = "test_read_bucket_forbidden_callables";
+ const TString readObject = "test_object_forbidden_callables";
+ const TString writeDataSourceName = "/Root/write_data_source";
+ const TString writeTableName = "/Root/write_table";
+ const TString writeBucket = "test_write_bucket_forbidden_callables";
+ const TString writeObject = "test_object_forbidden_callables/";
+ const TString writeYdbTable = "/Root/test_ydb_table";
+
+ {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+ CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client);
+ CreateBucket(writeBucket, s3Client);
+ }
+
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{read_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{read_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{read_table}` (
+ key Utf8, -- Nullable
+ value Utf8 -- Nullable
+ ) WITH (
+ DATA_SOURCE="{read_source}",
+ LOCATION="{read_object}",
+ FORMAT="json_each_row"
+ );
+
+ CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{write_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{write_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{write_source}",
+ LOCATION="{write_object}",
+ FORMAT="json_each_row"
+ );
+
+ CREATE TABLE `{write_ydb_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL,
+ PRIMARY KEY (key)
+ );
+ )",
+ "read_source"_a = readDataSourceName,
+ "read_table"_a = readTableName,
+ "read_location"_a = GetBucketLocation(readBucket),
+ "read_object"_a = readObject,
+ "write_source"_a = writeDataSourceName,
+ "write_table"_a = writeTableName,
+ "write_location"_a = GetBucketLocation(writeBucket),
+ "write_object"_a = writeObject,
+ "write_ydb_table"_a = writeYdbTable
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ // Forbidden callable like Unwrap is allowed in S3-only queries,
+ // but not allowed in mixed queries.
+ {
+ const TString sql = fmt::format(R"(
+ INSERT INTO `{write_table}`
+ SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}`
+ )",
+ "read_table"_a=readTableName,
+ "write_table"_a = writeTableName);
+
+ auto db = kikimr->GetQueryClient();
+ auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+ resultFuture.Wait();
+ UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
+ }
+
+ // Unwrap is used in query with effect applied to YDB table.
+ {
+ const TString sql = fmt::format(R"(
+ INSERT INTO `{write_table}`
+ SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}`;
+
+ DELETE FROM `{write_ydb_table}`
+ WHERE key = "42";
+ )",
+ "read_table"_a=readTableName,
+ "write_table"_a = writeTableName,
+ "write_ydb_table"_a = writeYdbTable);
+
+ auto db = kikimr->GetQueryClient();
+ auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+ resultFuture.Wait();
+ UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap");
+ }
+ }
}
} // namespace NKqp