aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/StorageMerge.h
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/StorageMerge.h
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/StorageMerge.h')
-rw-r--r--contrib/clickhouse/src/Storages/StorageMerge.h207
1 files changed, 207 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/StorageMerge.h b/contrib/clickhouse/src/Storages/StorageMerge.h
new file mode 100644
index 0000000000..babf0dd92e
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/StorageMerge.h
@@ -0,0 +1,207 @@
+#pragma once
+
+#include <Common/OptimizedRegularExpression.h>
+#include <Storages/SelectQueryInfo.h>
+#include <Storages/IStorage.h>
+#include <Processors/QueryPlan/SourceStepWithFilter.h>
+
+
+namespace DB
+{
+
+struct QueryPlanResourceHolder;
+
+/** A table that represents the union of an arbitrary number of other tables.
+ * All tables must have the same structure.
+ */
+class StorageMerge final : public IStorage, WithContext
+{
+public:
+ using DBToTableSetMap = std::map<String, std::set<String>>;
+
+ StorageMerge(
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const String & comment,
+ const String & source_database_name_or_regexp_,
+ bool database_is_regexp_,
+ const DBToTableSetMap & source_databases_and_tables_,
+ ContextPtr context_);
+
+ StorageMerge(
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const String & comment,
+ const String & source_database_name_or_regexp_,
+ bool database_is_regexp_,
+ const String & source_table_regexp_,
+ ContextPtr context_);
+
+ std::string getName() const override { return "Merge"; }
+
+ bool isRemote() const override;
+
+ /// The check is delayed to the read method. It checks the support of the tables used.
+ bool supportsSampling() const override { return true; }
+ bool supportsFinal() const override { return true; }
+ bool supportsIndexForIn() const override { return true; }
+ bool supportsSubcolumns() const override { return true; }
+ bool supportsPrewhere() const override { return true; }
+ std::optional<NameSet> supportedPrewhereColumns() const override;
+
+ bool canMoveConditionsToPrewhere() const override;
+
+ QueryProcessingStage::Enum
+ getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
+
+ void read(
+ QueryPlan & query_plan,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams) override;
+
+ void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
+
+ /// you need to add and remove columns in the sub-tables manually
+ /// the structure of sub-tables is not checked
+ void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;
+
+ bool mayBenefitFromIndexForIn(
+ const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override;
+
+ /// Evaluate database name or regexp for StorageMerge and TableFunction merge
+ static std::tuple<bool /* is_regexp */, ASTPtr> evaluateDatabaseName(const ASTPtr & node, ContextPtr context);
+
+private:
+ std::optional<OptimizedRegularExpression> source_database_regexp;
+ std::optional<OptimizedRegularExpression> source_table_regexp;
+ std::optional<DBToTableSetMap> source_databases_and_tables;
+
+ String source_database_name_or_regexp;
+ bool database_is_regexp = false;
+
+ /// (Database, Table, Lock, TableName)
+ using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
+ using StorageListWithLocks = std::list<StorageWithLockAndName>;
+ using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
+
+ StorageMerge::StorageListWithLocks getSelectedTables(
+ ContextPtr query_context,
+ const ASTPtr & query = nullptr,
+ bool filter_by_database_virtual_column = false,
+ bool filter_by_table_virtual_column = false) const;
+
+ template <typename F>
+ StoragePtr getFirstTable(F && predicate) const;
+
+ template <typename F>
+ void forEachTable(F && func) const;
+
+ DatabaseTablesIteratorPtr getDatabaseIterator(const String & database_name, ContextPtr context) const;
+
+ DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;
+
+ NamesAndTypesList getVirtuals() const override;
+ ColumnSizeByName getColumnSizes() const override;
+
+ ColumnsDescription getColumnsDescriptionFromSourceTables() const;
+
+ bool tableSupportsPrewhere() const;
+
+ friend class ReadFromMerge;
+};
+
+class ReadFromMerge final : public SourceStepWithFilter
+{
+public:
+ static constexpr auto name = "ReadFromMerge";
+ String getName() const override { return name; }
+
+ using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
+ using StorageListWithLocks = std::list<StorageWithLockAndName>;
+ using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
+
+ ReadFromMerge(
+ Block common_header_,
+ StorageListWithLocks selected_tables_,
+ Names column_names_,
+ bool has_database_virtual_column_,
+ bool has_table_virtual_column_,
+ size_t max_block_size,
+ size_t num_streams,
+ StoragePtr storage,
+ StorageSnapshotPtr storage_snapshot,
+ const SelectQueryInfo & query_info_,
+ ContextMutablePtr context_,
+ QueryProcessingStage::Enum processed_stage);
+
+ void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
+
+ const StorageListWithLocks & getSelectedTables() const { return selected_tables; }
+
+ /// Returns `false` if requested reading cannot be performed.
+ bool requestReadingInOrder(InputOrderInfoPtr order_info_);
+
+private:
+ const size_t required_max_block_size;
+ const size_t requested_num_streams;
+ const Block common_header;
+
+ StorageListWithLocks selected_tables;
+ Names column_names;
+ bool has_database_virtual_column;
+ bool has_table_virtual_column;
+ StoragePtr storage_merge;
+ StorageSnapshotPtr merge_storage_snapshot;
+
+ /// Store read plan for each child table.
+ /// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE).
+ std::vector<QueryPlan> child_plans;
+
+ SelectQueryInfo query_info;
+ ContextMutablePtr context;
+ QueryProcessingStage::Enum common_processed_stage;
+
+ InputOrderInfoPtr order_info;
+
+ struct AliasData
+ {
+ String name;
+ DataTypePtr type;
+ ASTPtr expression;
+ };
+
+ using Aliases = std::vector<AliasData>;
+
+ static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info,
+ const ContextPtr & modified_context,
+ const StorageWithLockAndName & storage_with_lock_and_name,
+ const StorageSnapshotPtr & storage_snapshot);
+
+ QueryPipelineBuilderPtr createSources(
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ const QueryProcessingStage::Enum & processed_stage,
+ UInt64 max_block_size,
+ const Block & header,
+ const Aliases & aliases,
+ const StorageWithLockAndName & storage_with_lock,
+ Names real_column_names,
+ ContextMutablePtr modified_context,
+ size_t streams_num,
+ bool concat_streams = false);
+
+ static void convertingSourceStream(
+ const Block & header,
+ const StorageMetadataPtr & metadata_snapshot,
+ const Aliases & aliases,
+ ContextPtr context,
+ QueryPipelineBuilder & builder,
+ const QueryProcessingStage::Enum & processed_stage);
+};
+
+}