aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-08-04 20:10:25 +0300
committerhcpp <hcpp@ydb.tech>2022-08-04 20:10:25 +0300
commit9b0d93190b58ed55a168e9636f71d4518247c124 (patch)
tree49e6e6a1d710cb8a1da2761080a6eebb19714b63
parent201de44fb980264f8de4a0f5f4edab77c4dcebb3 (diff)
downloadydb-9b0d93190b58ed55a168e9636f71d4518247c124.tar.gz
PR from branch users/hcpp/bindings_explicit_partitioning
test has been fixed first version
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/yq/libs/actors/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp17
-rw-r--r--ydb/core/yq/libs/control_plane_storage/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.h19
-rw-r--r--ydb/public/api/protos/fq.proto42
7 files changed, 84 insertions, 4 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 65ba8074c97..c6635466853 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -686,6 +686,7 @@ add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/yq/libs/actors)
+add_subdirectory(library/cpp/scheme)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
add_subdirectory(ydb/core/yq/libs/checkpointing_common)
@@ -788,6 +789,7 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
add_subdirectory(ydb/core/yq/libs/db_schema)
+add_subdirectory(ydb/library/yql/providers/s3/path_generator)
add_subdirectory(ydb/core/yq/libs/grpc)
add_subdirectory(ydb/core/yq/libs/private_client)
add_subdirectory(ydb/core/yq/libs/result_formatter)
@@ -822,7 +824,6 @@ add_subdirectory(library/cpp/containers/disjoint_interval_tree)
add_subdirectory(ydb/library/persqueue/obfuscate)
add_subdirectory(ydb/library/persqueue/counter_time_keeper)
add_subdirectory(ydb/core/ymq/actor)
-add_subdirectory(library/cpp/scheme)
add_subdirectory(ydb/core/mind/address_classification)
add_subdirectory(ydb/core/cms/console)
add_subdirectory(library/cpp/actors/http)
@@ -1363,7 +1364,6 @@ add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
add_subdirectory(ydb/library/yql/providers/common/ut_helpers)
add_subdirectory(ydb/library/yql/providers/s3/compressors)
-add_subdirectory(ydb/library/yql/providers/s3/path_generator)
add_subdirectory(ydb/library/yql/providers/s3/serializations)
add_subdirectory(ydb/library/yql/providers/function/common)
add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 790425d9501..e23b7f65d1a 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -690,6 +690,7 @@ add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/yq/libs/actors)
+add_subdirectory(library/cpp/scheme)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
add_subdirectory(ydb/core/yq/libs/checkpointing_common)
@@ -792,6 +793,7 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme)
add_subdirectory(ydb/core/yq/libs/db_schema)
+add_subdirectory(ydb/library/yql/providers/s3/path_generator)
add_subdirectory(ydb/core/yq/libs/grpc)
add_subdirectory(ydb/core/yq/libs/private_client)
add_subdirectory(ydb/core/yq/libs/result_formatter)
@@ -826,7 +828,6 @@ add_subdirectory(library/cpp/containers/disjoint_interval_tree)
add_subdirectory(ydb/library/persqueue/obfuscate)
add_subdirectory(ydb/library/persqueue/counter_time_keeper)
add_subdirectory(ydb/core/ymq/actor)
-add_subdirectory(library/cpp/scheme)
add_subdirectory(ydb/core/mind/address_classification)
add_subdirectory(ydb/core/cms/console)
add_subdirectory(library/cpp/actors/http)
@@ -1386,7 +1387,6 @@ add_subdirectory(ydb/library/yql/parser/lexer_common/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
add_subdirectory(ydb/library/yql/providers/common/ut_helpers)
-add_subdirectory(ydb/library/yql/providers/s3/path_generator)
add_subdirectory(ydb/library/yql/providers/function/common)
add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/gateway)
diff --git a/ydb/core/yq/libs/actors/CMakeLists.txt b/ydb/core/yq/libs/actors/CMakeLists.txt
index f71100fa73a..42f2b4e58d1 100644
--- a/ydb/core/yq/libs/actors/CMakeLists.txt
+++ b/ydb/core/yq/libs/actors/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(yq-libs-actors PUBLIC
cpp-json-yson
cpp-monlib-dynamic_counters
library-cpp-random_provider
+ library-cpp-scheme
cpp-string_utils-quote
library-cpp-time_provider
library-cpp-yson
diff --git a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp
index fb010679b3e..95a71b6c2f2 100644
--- a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp
+++ b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp
@@ -2,6 +2,9 @@
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/core/yq/libs/result_formatter/result_formatter.h>
+
+#include <library/cpp/scheme/scheme.h>
+
#include <util/generic/vector.h>
namespace NYq {
@@ -17,6 +20,8 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande
TString compression;
TString schema;
THashMap<TString, TString> formatSettings;
+ NSc::TValue projection;
+ NSc::TValue partitionedBy;
switch (binding.content().setting().binding_case()) {
case YandexQuery::BindingSetting::kDataStreams: {
clusterType = PqProviderName;
@@ -41,6 +46,10 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande
compression = s.compression();
schema = FormatSchema(s.schema());
formatSettings = {s.format_setting().begin(), s.format_setting().end()};
+ for (const auto& [key, value]: s.projection()) {
+ projection[key] = value;
+ }
+ partitionedBy.AppendAll(s.partitioned_by());
break;
}
@@ -67,6 +76,14 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande
}
bindSettings.Settings["schema"] = schema;
+ if (!projection.DictEmpty()) {
+ bindSettings.Settings["projection"] = projection.ToJsonPretty();
+ }
+
+ if (!partitionedBy.ArrayEmpty()) {
+ bindSettings.Settings["partitioned_by"] = partitionedBy.ToJsonPretty();
+ }
+
// todo: use visibility to fill either PrivateBindings or ScopedBindings
sqlSettings.PrivateBindings[binding.content().name()] = std::move(bindSettings);
}
diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
index a1fa82c7c30..729fbb206fb 100644
--- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
@@ -34,6 +34,7 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC
api-protos
cpp-client-ydb_scheme
cpp-client-ydb_table
+ providers-s3-path_generator
yql-public-issue
)
target_sources(yq-libs-control_plane_storage PRIVATE
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h
index 8510a53a4bc..06189599359 100644
--- a/ydb/core/yq/libs/control_plane_storage/request_validators.h
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h
@@ -3,6 +3,7 @@
#include "util.h"
#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/public/api/protos/yq.pb.h>
@@ -11,6 +12,8 @@
#include <util/string/builder.h>
#include <util/string/cast.h>
+#include <library/cpp/scheme/scheme.h>
+
namespace NYq {
template<class P>
@@ -118,6 +121,22 @@ NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::Bin
const YandexQuery::ObjectStorageBinding objectStorage = setting.object_storage();
for (const auto& subset: objectStorage.subset()) {
issues.AddIssues(ValidateFormatSetting(subset.format_setting()));
+ if (subset.projection_size() || subset.partitioned_by_size()) {
+ try {
+ TVector<TString> partitionedBy{subset.partitioned_by().begin(), subset.partitioned_by().end()};
+ TString projectionStr;
+ if (subset.projection_size()) {
+ NSc::TValue projection;
+ for (const auto& [key, value]: subset.projection()) {
+ projection[key] = value;
+ }
+ projectionStr = projection.ToJsonPretty();
+ }
+ NYql::NPathGenerator::CreatePathGenerator(projectionStr, partitionedBy); // an exception is thrown if an error occurs
+ } catch (...) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage()));
+ }
+ }
}
break;
}
diff --git a/ydb/public/api/protos/fq.proto b/ydb/public/api/protos/fq.proto
index d68ade17f2b..4424cd46aa9 100644
--- a/ydb/public/api/protos/fq.proto
+++ b/ydb/public/api/protos/fq.proto
@@ -629,6 +629,48 @@ message ObjectStorageBinding {
map<string, string> format_setting = 3 [(Ydb.size).le = 100];
string compression = 4 [(Ydb.length).le = 1024];
Schema schema = 5;
+
+ /*
+ Partition projection is used to speed up the processing of highly partitioned
+ storages and automate the management of partitions. In partition projection, partition values and
+ locations are calculated from configuration rather than read from an object storage. Depending on the
+ specific characteristics of the query and underlying data, partition projection can significantly
+ reduce query execution time if it uses partitioning constraints on partition metadata retrieval. Similar
+ functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
+ Only enum, integer and date types are supported for path generation. When using projection, there must
+ be at least one element in partitioned_by. This behavior is introduced for symmetric query usage and
+ compatibility with Athena behavior.
+
+ Example:
+ projection = {
+ "projection.enabled" : "true", // used to enable and disable partitioning
+ "projection.city.type" : "enum", // to generate the city column, the enum type will be used (enumeration of objects separated by commas)
+ "projection.city.values" : "Washington,Roma", // column values city Washington or Roma
+ "projection.code.type" : "enum", // to generate the code column, the enum type will be used (enumeration of objects separated by commas)
+ "projection.code.values" : "0,1", // column values code 0 or 1
+ "storage.location.template" : "/${city}/${code}/${device_id}" // the template to which the generated values will be substituted
+ }
+ partitioned_by = [ "city", "device_id" ] // a subset of columns that are included in partitioning
+ - If storage.location.template and partitioned_by are specified together, then the rule from storage.location.template will be used.
+ - If only partitioned_by is specified, then the Hive Metastore format will be used for storage.location.template: "/city=${city}/device_id=${device_id}"
+ The list of paths that correspond to described projection and partitioned_by values are:
+ "/Washington/0/${device_id}", "/Washington/1/${device_id}", "/Roma/0/${device_id}", "/Roma/1/${device_id}"
+ */
+ map<string, string> projection = 6;
+
+ /*
+ By separating the data, it is possible to limit the amount of data scanned by each query, thereby improving
+ performance and reducing costs. Therefore, user data is partition by key (in practice, this is a partition by time).
+ The partitioned_by defines the keys on which to partition data. The columns described in partitioned_by
+ must be specified in the schema. If projection is not specified, the template will be generated according to
+ partitioned_by. Similar functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
+
+ Example:
+ partitioned_by = [ "city", "code", "device_id" ]
+ The corresponding storage.location.template will be as follows:
+ "/city=${city}/code=${code}/device_id=${device_id}"
+ */
+ repeated string partitioned_by = 7;
}
repeated Subset subset = 1;