diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/StorageSet.cpp | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/StorageSet.cpp')
-rw-r--r-- | contrib/clickhouse/src/Storages/StorageSet.cpp | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/StorageSet.cpp b/contrib/clickhouse/src/Storages/StorageSet.cpp new file mode 100644 index 0000000000..79369ab4bc --- /dev/null +++ b/contrib/clickhouse/src/Storages/StorageSet.cpp @@ -0,0 +1,267 @@ +#include <Storages/StorageSet.h> +#include <Storages/StorageFactory.h> +#include <Compression/CompressedReadBuffer.h> +#include <IO/WriteBufferFromFile.h> +#include <Compression/CompressedWriteBuffer.h> +#include <Formats/NativeWriter.h> +#include <Formats/NativeReader.h> +#include <QueryPipeline/ProfileInfo.h> +#include <Disks/IDisk.h> +#include <Common/formatReadable.h> +#include <Common/StringUtils/StringUtils.h> +#include <Interpreters/Context.h> +#include <IO/ReadBufferFromFileBase.h> +#include <Common/logger_useful.h> +#include <Interpreters/Set.h> +#include <Processors/Sinks/SinkToStorage.h> +#include <Parsers/ASTCreateQuery.h> +#include <filesystem> + +namespace fs = std::filesystem; + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + + +namespace ErrorCodes +{ + extern const int INCORRECT_FILE_NAME; +} + + +class SetOrJoinSink : public SinkToStorage, WithContext +{ +public: + SetOrJoinSink( + ContextPtr ctx, StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_, + const String & backup_path_, const String & backup_tmp_path_, + const String & backup_file_name_, bool persistent_); + + String getName() const override { return "SetOrJoinSink"; } + void consume(Chunk chunk) override; + void onFinish() override; + +private: + StorageSetOrJoinBase & table; + StorageMetadataPtr metadata_snapshot; + String backup_path; + String backup_tmp_path; + String backup_file_name; + std::unique_ptr<WriteBufferFromFileBase> backup_buf; + CompressedWriteBuffer compressed_backup_buf; + NativeWriter backup_stream; + bool persistent; +}; + + +SetOrJoinSink::SetOrJoinSink( + ContextPtr ctx, + StorageSetOrJoinBase & table_, + const StorageMetadataPtr & metadata_snapshot_, + const String & backup_path_, + const String & backup_tmp_path_, + const String & backup_file_name_, + bool persistent_) + : SinkToStorage(metadata_snapshot_->getSampleBlock()) + , WithContext(ctx) + , table(table_) + , metadata_snapshot(metadata_snapshot_) + , backup_path(backup_path_) + , backup_tmp_path(backup_tmp_path_) + , backup_file_name(backup_file_name_) + , backup_buf(table_.disk->writeFile(fs::path(backup_tmp_path) / backup_file_name)) + , compressed_backup_buf(*backup_buf) + , backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock()) + , persistent(persistent_) +{ +} + +void SetOrJoinSink::consume(Chunk chunk) +{ + Block block = getHeader().cloneWithColumns(chunk.detachColumns()); + + table.insertBlock(block, getContext()); + if (persistent) + backup_stream.write(block); +} + +void SetOrJoinSink::onFinish() +{ + table.finishInsert(); + if (persistent) + { + backup_stream.flush(); + compressed_backup_buf.next(); + backup_buf->next(); + backup_buf->finalize(); + + table.disk->replaceFile(fs::path(backup_tmp_path) / backup_file_name, fs::path(backup_path) / backup_file_name); + } +} + + +SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool /*async_insert*/) +{ + UInt64 id = ++increment; + return std::make_shared<SetOrJoinSink>( + context, *this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent); +} + + +StorageSetOrJoinBase::StorageSetOrJoinBase( + DiskPtr disk_, + const String & relative_path_, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment, + bool persistent_) + : IStorage(table_id_), disk(disk_), persistent(persistent_) +{ + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + + if (relative_path_.empty()) + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Join and Set storages require data path"); + + path = relative_path_; +} + + +StorageSet::StorageSet( + DiskPtr disk_, + const String & relative_path_, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment, + bool persistent_) + : StorageSetOrJoinBase{disk_, relative_path_, table_id_, columns_, constraints_, comment, persistent_} + , set(std::make_shared<Set>(SizeLimits(), 0, true)) +{ + Block header = getInMemoryMetadataPtr()->getSampleBlock(); + set->setHeader(header.getColumnsWithTypeAndName()); + + restore(); +} + + +void StorageSet::insertBlock(const Block & block, ContextPtr) { set->insertFromBlock(block.getColumnsWithTypeAndName()); } +void StorageSet::finishInsert() { set->finishInsert(); } + +size_t StorageSet::getSize(ContextPtr) const { return set->getTotalRowCount(); } +std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); } +std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); } + +void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) +{ + if (disk->exists(path)) + disk->removeRecursive(path); + else + LOG_INFO(&Poco::Logger::get("StorageSet"), "Path {} is already removed from disk {}", path, disk->getName()); + + disk->createDirectories(path); + disk->createDirectories(fs::path(path) / "tmp/"); + + Block header = metadata_snapshot->getSampleBlock(); + + increment = 0; + set = std::make_shared<Set>(SizeLimits(), 0, true); + set->setHeader(header.getColumnsWithTypeAndName()); +} + + +void StorageSetOrJoinBase::restore() +{ + if (!disk->exists(fs::path(path) / "tmp/")) + { + disk->createDirectories(fs::path(path) / "tmp/"); + return; + } + + static const char * file_suffix = ".bin"; + static const auto file_suffix_size = strlen(".bin"); + + for (auto dir_it{disk->iterateDirectory(path)}; dir_it->isValid(); dir_it->next()) + { + const auto & name = dir_it->name(); + const auto & file_path = dir_it->path(); + + if (disk->isFile(file_path) + && endsWith(name, file_suffix) + && disk->getFileSize(file_path) > 0) + { + /// Calculate the maximum number of available files with a backup to add the following files with large numbers. + UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size)); + if (file_num > increment) + increment = file_num; + + restoreFromFile(dir_it->path()); + } + } +} + + +void StorageSetOrJoinBase::restoreFromFile(const String & file_path) +{ + ContextPtr ctx = nullptr; + auto backup_buf = disk->readFile(file_path); + CompressedReadBuffer compressed_backup_buf(*backup_buf); + NativeReader backup_stream(compressed_backup_buf, 0); + + ProfileInfo info; + while (Block block = backup_stream.read()) + { + info.update(block); + insertBlock(block, ctx); + } + + finishInsert(); + + /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project. + LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.", + file_path, info.rows, ReadableSize(info.bytes), getSize(ctx)); +} + + +void StorageSetOrJoinBase::rename(const String & new_path_to_table_data, const StorageID & new_table_id) +{ + /// Rename directory with data. + disk->replaceFile(path, new_path_to_table_data); + + path = new_path_to_table_data; + renameInMemory(new_table_id); +} + + +void registerStorageSet(StorageFactory & factory) +{ + factory.registerStorage("Set", [](const StorageFactory::Arguments & args) + { + if (!args.engine_args.empty()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Engine {} doesn't support any arguments ({} given)", + args.engine_name, args.engine_args.size()); + + bool has_settings = args.storage_def->settings; + SetSettings set_settings; + if (has_settings) + set_settings.loadFromQuery(*args.storage_def); + + DiskPtr disk = args.getContext()->getDisk(set_settings.disk); + return std::make_shared<StorageSet>( + disk, args.relative_data_path, args.table_id, args.columns, args.constraints, args.comment, set_settings.persistent); + }, StorageFactory::StorageFeatures{ .supports_settings = true, }); +} + + +} |