diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/StorageMerge.h | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/StorageMerge.h')
-rw-r--r-- | contrib/clickhouse/src/Storages/StorageMerge.h | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/StorageMerge.h b/contrib/clickhouse/src/Storages/StorageMerge.h new file mode 100644 index 0000000000..babf0dd92e --- /dev/null +++ b/contrib/clickhouse/src/Storages/StorageMerge.h @@ -0,0 +1,207 @@ +#pragma once + +#include <Common/OptimizedRegularExpression.h> +#include <Storages/SelectQueryInfo.h> +#include <Storages/IStorage.h> +#include <Processors/QueryPlan/SourceStepWithFilter.h> + + +namespace DB +{ + +struct QueryPlanResourceHolder; + +/** A table that represents the union of an arbitrary number of other tables. + * All tables must have the same structure. + */ +class StorageMerge final : public IStorage, WithContext +{ +public: + using DBToTableSetMap = std::map<String, std::set<String>>; + + StorageMerge( + const StorageID & table_id_, + const ColumnsDescription & columns_, + const String & comment, + const String & source_database_name_or_regexp_, + bool database_is_regexp_, + const DBToTableSetMap & source_databases_and_tables_, + ContextPtr context_); + + StorageMerge( + const StorageID & table_id_, + const ColumnsDescription & columns_, + const String & comment, + const String & source_database_name_or_regexp_, + bool database_is_regexp_, + const String & source_table_regexp_, + ContextPtr context_); + + std::string getName() const override { return "Merge"; } + + bool isRemote() const override; + + /// The check is delayed to the read method. It checks the support of the tables used. + bool supportsSampling() const override { return true; } + bool supportsFinal() const override { return true; } + bool supportsIndexForIn() const override { return true; } + bool supportsSubcolumns() const override { return true; } + bool supportsPrewhere() const override { return true; } + std::optional<NameSet> supportedPrewhereColumns() const override; + + bool canMoveConditionsToPrewhere() const override; + + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + + void read( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override; + + /// you need to add and remove columns in the sub-tables manually + /// the structure of sub-tables is not checked + void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override; + + bool mayBenefitFromIndexForIn( + const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override; + + /// Evaluate database name or regexp for StorageMerge and TableFunction merge + static std::tuple<bool /* is_regexp */, ASTPtr> evaluateDatabaseName(const ASTPtr & node, ContextPtr context); + +private: + std::optional<OptimizedRegularExpression> source_database_regexp; + std::optional<OptimizedRegularExpression> source_table_regexp; + std::optional<DBToTableSetMap> source_databases_and_tables; + + String source_database_name_or_regexp; + bool database_is_regexp = false; + + /// (Database, Table, Lock, TableName) + using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>; + using StorageListWithLocks = std::list<StorageWithLockAndName>; + using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>; + + StorageMerge::StorageListWithLocks getSelectedTables( + ContextPtr query_context, + const ASTPtr & query = nullptr, + bool filter_by_database_virtual_column = false, + bool filter_by_table_virtual_column = false) const; + + template <typename F> + StoragePtr getFirstTable(F && predicate) const; + + template <typename F> + void forEachTable(F && func) const; + + DatabaseTablesIteratorPtr getDatabaseIterator(const String & database_name, ContextPtr context) const; + + DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const; + + NamesAndTypesList getVirtuals() const override; + ColumnSizeByName getColumnSizes() const override; + + ColumnsDescription getColumnsDescriptionFromSourceTables() const; + + bool tableSupportsPrewhere() const; + + friend class ReadFromMerge; +}; + +class ReadFromMerge final : public SourceStepWithFilter +{ +public: + static constexpr auto name = "ReadFromMerge"; + String getName() const override { return name; } + + using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>; + using StorageListWithLocks = std::list<StorageWithLockAndName>; + using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>; + + ReadFromMerge( + Block common_header_, + StorageListWithLocks selected_tables_, + Names column_names_, + bool has_database_virtual_column_, + bool has_table_virtual_column_, + size_t max_block_size, + size_t num_streams, + StoragePtr storage, + StorageSnapshotPtr storage_snapshot, + const SelectQueryInfo & query_info_, + ContextMutablePtr context_, + QueryProcessingStage::Enum processed_stage); + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + + const StorageListWithLocks & getSelectedTables() const { return selected_tables; } + + /// Returns `false` if requested reading cannot be performed. + bool requestReadingInOrder(InputOrderInfoPtr order_info_); + +private: + const size_t required_max_block_size; + const size_t requested_num_streams; + const Block common_header; + + StorageListWithLocks selected_tables; + Names column_names; + bool has_database_virtual_column; + bool has_table_virtual_column; + StoragePtr storage_merge; + StorageSnapshotPtr merge_storage_snapshot; + + /// Store read plan for each child table. + /// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE). + std::vector<QueryPlan> child_plans; + + SelectQueryInfo query_info; + ContextMutablePtr context; + QueryProcessingStage::Enum common_processed_stage; + + InputOrderInfoPtr order_info; + + struct AliasData + { + String name; + DataTypePtr type; + ASTPtr expression; + }; + + using Aliases = std::vector<AliasData>; + + static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info, + const ContextPtr & modified_context, + const StorageWithLockAndName & storage_with_lock_and_name, + const StorageSnapshotPtr & storage_snapshot); + + QueryPipelineBuilderPtr createSources( + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + const QueryProcessingStage::Enum & processed_stage, + UInt64 max_block_size, + const Block & header, + const Aliases & aliases, + const StorageWithLockAndName & storage_with_lock, + Names real_column_names, + ContextMutablePtr modified_context, + size_t streams_num, + bool concat_streams = false); + + static void convertingSourceStream( + const Block & header, + const StorageMetadataPtr & metadata_snapshot, + const Aliases & aliases, + ContextPtr context, + QueryPipelineBuilder & builder, + const QueryProcessingStage::Enum & processed_stage); +}; + +} |