diff options
author | hcpp <hcpp@ydb.tech> | 2022-08-04 20:10:25 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-08-04 20:10:25 +0300 |
commit | 9b0d93190b58ed55a168e9636f71d4518247c124 (patch) | |
tree | 49e6e6a1d710cb8a1da2761080a6eebb19714b63 | |
parent | 201de44fb980264f8de4a0f5f4edab77c4dcebb3 (diff) | |
download | ydb-9b0d93190b58ed55a168e9636f71d4518247c124.tar.gz |
PR from branch users/hcpp/bindings_explicit_partitioning
test has been fixed
first version
-rw-r--r-- | CMakeLists.darwin.txt | 4 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp | 17 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/request_validators.h | 19 | ||||
-rw-r--r-- | ydb/public/api/protos/fq.proto | 42 |
7 files changed, 84 insertions, 4 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 65ba8074c97..c6635466853 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -686,6 +686,7 @@ add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/yq/libs/actors) +add_subdirectory(library/cpp/scheme) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) add_subdirectory(ydb/core/yq/libs/checkpointing_common) @@ -788,6 +789,7 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) add_subdirectory(ydb/core/yq/libs/db_schema) +add_subdirectory(ydb/library/yql/providers/s3/path_generator) add_subdirectory(ydb/core/yq/libs/grpc) add_subdirectory(ydb/core/yq/libs/private_client) add_subdirectory(ydb/core/yq/libs/result_formatter) @@ -822,7 +824,6 @@ add_subdirectory(library/cpp/containers/disjoint_interval_tree) add_subdirectory(ydb/library/persqueue/obfuscate) add_subdirectory(ydb/library/persqueue/counter_time_keeper) add_subdirectory(ydb/core/ymq/actor) -add_subdirectory(library/cpp/scheme) add_subdirectory(ydb/core/mind/address_classification) add_subdirectory(ydb/core/cms/console) add_subdirectory(library/cpp/actors/http) @@ -1363,7 +1364,6 @@ add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) add_subdirectory(ydb/library/yql/providers/common/ut_helpers) add_subdirectory(ydb/library/yql/providers/s3/compressors) -add_subdirectory(ydb/library/yql/providers/s3/path_generator) add_subdirectory(ydb/library/yql/providers/s3/serializations) add_subdirectory(ydb/library/yql/providers/function/common) add_subdirectory(ydb/library/yql/providers/function/expr_nodes) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 790425d9501..e23b7f65d1a 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -690,6 +690,7 @@ add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/yq/libs/actors) +add_subdirectory(library/cpp/scheme) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) add_subdirectory(ydb/core/yq/libs/checkpointing_common) @@ -792,6 +793,7 @@ add_subdirectory(ydb/public/sdk/cpp/client/ydb_common_client/impl) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) add_subdirectory(ydb/public/sdk/cpp/client/ydb_scheme) add_subdirectory(ydb/core/yq/libs/db_schema) +add_subdirectory(ydb/library/yql/providers/s3/path_generator) add_subdirectory(ydb/core/yq/libs/grpc) add_subdirectory(ydb/core/yq/libs/private_client) add_subdirectory(ydb/core/yq/libs/result_formatter) @@ -826,7 +828,6 @@ add_subdirectory(library/cpp/containers/disjoint_interval_tree) add_subdirectory(ydb/library/persqueue/obfuscate) add_subdirectory(ydb/library/persqueue/counter_time_keeper) add_subdirectory(ydb/core/ymq/actor) -add_subdirectory(library/cpp/scheme) add_subdirectory(ydb/core/mind/address_classification) add_subdirectory(ydb/core/cms/console) add_subdirectory(library/cpp/actors/http) @@ -1386,7 +1387,6 @@ add_subdirectory(ydb/library/yql/parser/lexer_common/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) add_subdirectory(ydb/library/yql/providers/common/ut_helpers) -add_subdirectory(ydb/library/yql/providers/s3/path_generator) add_subdirectory(ydb/library/yql/providers/function/common) add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/gateway) diff --git a/ydb/core/yq/libs/actors/CMakeLists.txt b/ydb/core/yq/libs/actors/CMakeLists.txt index f71100fa73a..42f2b4e58d1 100644 --- a/ydb/core/yq/libs/actors/CMakeLists.txt +++ b/ydb/core/yq/libs/actors/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(yq-libs-actors PUBLIC cpp-json-yson cpp-monlib-dynamic_counters library-cpp-random_provider + library-cpp-scheme cpp-string_utils-quote library-cpp-time_provider library-cpp-yson diff --git a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp index fb010679b3e..95a71b6c2f2 100644 --- a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp +++ b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp @@ -2,6 +2,9 @@ #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/core/yq/libs/result_formatter/result_formatter.h> + +#include <library/cpp/scheme/scheme.h> + #include <util/generic/vector.h> namespace NYq { @@ -17,6 +20,8 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande TString compression; TString schema; THashMap<TString, TString> formatSettings; + NSc::TValue projection; + NSc::TValue partitionedBy; switch (binding.content().setting().binding_case()) { case YandexQuery::BindingSetting::kDataStreams: { clusterType = PqProviderName; @@ -41,6 +46,10 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande compression = s.compression(); schema = FormatSchema(s.schema()); formatSettings = {s.format_setting().begin(), s.format_setting().end()}; + for (const auto& [key, value]: s.projection()) { + projection[key] = value; + } + partitionedBy.AppendAll(s.partitioned_by()); break; } @@ -67,6 +76,14 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande } bindSettings.Settings["schema"] = schema; + if (!projection.DictEmpty()) { + bindSettings.Settings["projection"] = projection.ToJsonPretty(); + } + + if (!partitionedBy.ArrayEmpty()) { + bindSettings.Settings["partitioned_by"] = partitionedBy.ToJsonPretty(); + } + // todo: use visibility to fill either PrivateBindings or ScopedBindings sqlSettings.PrivateBindings[binding.content().name()] = std::move(bindSettings); } diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt index a1fa82c7c30..729fbb206fb 100644 --- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt +++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt @@ -34,6 +34,7 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC api-protos cpp-client-ydb_scheme cpp-client-ydb_table + providers-s3-path_generator yql-public-issue ) target_sources(yq-libs-control_plane_storage PRIVATE diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h index 8510a53a4bc..06189599359 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h @@ -3,6 +3,7 @@ #include "util.h" #include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/public/api/protos/yq.pb.h> @@ -11,6 +12,8 @@ #include <util/string/builder.h> #include <util/string/cast.h> +#include <library/cpp/scheme/scheme.h> + namespace NYq { template<class P> @@ -118,6 +121,22 @@ NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::Bin const YandexQuery::ObjectStorageBinding objectStorage = setting.object_storage(); for (const auto& subset: objectStorage.subset()) { issues.AddIssues(ValidateFormatSetting(subset.format_setting())); + if (subset.projection_size() || subset.partitioned_by_size()) { + try { + TVector<TString> partitionedBy{subset.partitioned_by().begin(), subset.partitioned_by().end()}; + TString projectionStr; + if (subset.projection_size()) { + NSc::TValue projection; + for (const auto& [key, value]: subset.projection()) { + projection[key] = value; + } + projectionStr = projection.ToJsonPretty(); + } + NYql::NPathGenerator::CreatePathGenerator(projectionStr, partitionedBy); // an exception is thrown if an error occurs + } catch (...) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage())); + } + } } break; } diff --git a/ydb/public/api/protos/fq.proto b/ydb/public/api/protos/fq.proto index d68ade17f2b..4424cd46aa9 100644 --- a/ydb/public/api/protos/fq.proto +++ b/ydb/public/api/protos/fq.proto @@ -629,6 +629,48 @@ message ObjectStorageBinding { map<string, string> format_setting = 3 [(Ydb.size).le = 100]; string compression = 4 [(Ydb.length).le = 1024]; Schema schema = 5; + + /* + Partition projection is used to speed up the processing of highly partitioned + storages and automate the management of partitions. In partition projection, partition values and + locations are calculated from configuration rather than read from an object storage. Depending on the + specific characteristics of the query and underlying data, partition projection can significantly + reduce query execution time if it uses partitioning constraints on partition metadata retrieval. Similar + functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html + Only enum, integer and date types are supported for path generation. When using projection, there must + be at least one element in partitioned_by. This behavior is introduced for symmetric query usage and + compatibility with Athena behavior. + + Example: + projection = { + "projection.enabled" : "true", // used to enable and disable partitioning + "projection.city.type" : "enum", // to generate the city column, the enum type will be used (enumeration of objects separated by commas) + "projection.city.values" : "Washington,Roma", // column values city Washington or Roma + "projection.code.type" : "enum", // to generate the code column, the enum type will be used (enumeration of objects separated by commas) + "projection.code.values" : "0,1", // column values code 0 or 1 + "storage.location.template" : "/${city}/${code}/${device_id}" // the template to which the generated values will be substituted + } + partitioned_by = [ "city", "device_id" ] // a subset of columns that are included in partitioning + - If storage.location.template and partitioned_by are specified together, then the rule from storage.location.template will be used. + - If only partitioned_by is specified, then the Hive Metastore format will be used for storage.location.template: "/city=${city}/device_id=${device_id}" + The list of paths that correspond to described projection and partitioned_by values are: + "/Washington/0/${device_id}", "/Washington/1/${device_id}", "/Roma/0/${device_id}", "/Roma/1/${device_id}" + */ + map<string, string> projection = 6; + + /* + By separating the data, it is possible to limit the amount of data scanned by each query, thereby improving + performance and reducing costs. Therefore, user data is partition by key (in practice, this is a partition by time). + The partitioned_by defines the keys on which to partition data. The columns described in partitioned_by + must be specified in the schema. If projection is not specified, the template will be generated according to + partitioned_by. Similar functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partitions.html + + Example: + partitioned_by = [ "city", "code", "device_id" ] + The corresponding storage.location.template will be as follows: + "/city=${city}/code=${code}/device_id=${device_id}" + */ + repeated string partitioned_by = 7; } repeated Subset subset = 1; |