aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-08-08 12:14:21 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-08-08 13:19:14 +0300
commit6531528c7f69c09862eb0c8449c970aa98e0a623 (patch)
tree48ca34daecc5c3c9c362011535ac6b24b3bb7ef4
parent3f450d2fb597681b15b0848b7cccfaa899143a8c (diff)
downloadydb-6531528c7f69c09862eb0c8449c970aa98e0a623.tar.gz
Fix build dq task with channels and source
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h3
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp112
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/query.cpp4
3 files changed, 106 insertions, 13 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 61f2db18f8..3558d2bc73 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -756,8 +756,7 @@ protected:
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
- YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1,
- "multiple sources or sources mixed with connections");
+ YQL_ENSURE(stage.SourcesSize() == 1, "multiple sources in one task are not supported");
const auto& stageSource = stage.GetSources(0);
const auto& externalSource = stageSource.GetExternalSource();
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 18e0b3766b..0ec78b3749 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -29,8 +29,15 @@ R"({"key": "1", "value": "trololo"}
{"key": "2", "value": "hello world"}
)"sv;
+constexpr TStringBuf TEST_CONTENT_KEYS =
+R"({"key": "1"}
+{"key": "3"}
+)"sv;
+
const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])";
+const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";
+
bool InitAwsApi() {
Aws::InitAPI(Aws::SDKOptions());
return true;
@@ -64,19 +71,26 @@ void CreateBucket(const TString& bucket) {
CreateBucket(bucket, s3Client);
}
-void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
- CreateBucket(bucket, s3Client);
+void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
+ Aws::S3::Model::PutObjectRequest req;
+ req.WithBucket(bucket).WithKey(object);
- {
- Aws::S3::Model::PutObjectRequest req;
- req.WithBucket(bucket).WithKey(object);
+ auto inputStream = std::make_shared<std::stringstream>();
+ *inputStream << content;
+ req.SetBody(inputStream);
+ const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
+ UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
+}
- auto inputStream = std::make_shared<std::stringstream>();
- *inputStream << content;
- req.SetBody(inputStream);
- const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
- UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
- }
+void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ UploadObject(bucket, object, content, s3Client);
+}
+
+void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
+ CreateBucket(bucket, s3Client);
+ UploadObject(bucket, object, content, s3Client);
}
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
@@ -815,6 +829,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n");
UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n");
}
+
+ Y_UNIT_TEST(JoinTwoSources) {
+ using namespace fmt::literals;
+ const TString dataSource = "/Root/data_source";
+ const TString bucket = "test_bucket_mixed";
+ const TString dataTable = "/Root/data";
+ const TString dataObject = "data";
+ const TString keysTable = "/Root/keys";
+ const TString keysObject = "keys";
+
+ {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+ CreateBucket(bucket, s3Client);
+ UploadObject(bucket, dataObject, TEST_CONTENT, s3Client);
+ UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client);
+ }
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{data_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{bucket_location}",
+ AUTH_METHOD="NONE"
+ );
+
+ CREATE EXTERNAL TABLE `{data_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{data_source}",
+ LOCATION="{data_object}",
+ FORMAT="json_each_row"
+ );
+
+ CREATE EXTERNAL TABLE `{keys_table}` (
+ key Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{data_source}",
+ LOCATION="{keys_object}",
+ FORMAT="json_each_row"
+ );
+ )",
+ "data_source"_a = dataSource,
+ "bucket_location"_a = GetBucketLocation(bucket),
+ "data_table"_a = dataTable,
+ "data_object"_a = dataObject,
+ "keys_table"_a = keysTable,
+ "keys_object"_a = keysObject
+ );
+ auto schemeQueryesult = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(schemeQueryesult.GetStatus() == NYdb::EStatus::SUCCESS, schemeQueryesult.GetIssues().ToString());
+
+ const TString sql = fmt::format(R"(
+ SELECT * FROM `{data_table}`
+ WHERE key IN (
+ SELECT key FROM `{keys_table}`
+ )
+ )",
+ "data_table"_a = dataTable,
+ "keys_table"_a = keysTable);
+
+ auto db = kikimr.GetQueryClient();
+ auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+ resultFuture.Wait();
+ UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
+ auto result = resultFuture.GetValueSync().GetResultSetParser(0);
+ UNIT_ASSERT_VALUES_EQUAL(result.RowsCount(), 1);
+ UNIT_ASSERT(result.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("key").GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo");
+ UNIT_ASSERT(!result.TryNextRow());
+ }
}
} // namespace NKqp
diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/ydb_query/query.cpp
index 7da774d5e7..9fae3953e9 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/query.cpp
@@ -14,6 +14,10 @@ TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const {
return ResultSets_[resultIndex];
}
+TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const {
+ return TResultSetParser(GetResultSet(resultIndex));
+}
+
TScriptExecutionOperation::TScriptExecutionOperation(TStatus&& status, Ydb::Operations::Operation&& operation)
: TOperation(std::move(status), std::move(operation))
{