aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/StorageSet.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/StorageSet.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/StorageSet.cpp')
-rw-r--r--contrib/clickhouse/src/Storages/StorageSet.cpp267
1 files changed, 267 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/StorageSet.cpp b/contrib/clickhouse/src/Storages/StorageSet.cpp
new file mode 100644
index 0000000000..79369ab4bc
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/StorageSet.cpp
@@ -0,0 +1,267 @@
+#include <Storages/StorageSet.h>
+#include <Storages/StorageFactory.h>
+#include <Compression/CompressedReadBuffer.h>
+#include <IO/WriteBufferFromFile.h>
+#include <Compression/CompressedWriteBuffer.h>
+#include <Formats/NativeWriter.h>
+#include <Formats/NativeReader.h>
+#include <QueryPipeline/ProfileInfo.h>
+#include <Disks/IDisk.h>
+#include <Common/formatReadable.h>
+#include <Common/StringUtils/StringUtils.h>
+#include <Interpreters/Context.h>
+#include <IO/ReadBufferFromFileBase.h>
+#include <Common/logger_useful.h>
+#include <Interpreters/Set.h>
+#include <Processors/Sinks/SinkToStorage.h>
+#include <Parsers/ASTCreateQuery.h>
+#include <filesystem>
+
+namespace fs = std::filesystem;
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+
+
+namespace ErrorCodes
+{
+ extern const int INCORRECT_FILE_NAME;
+}
+
+
+class SetOrJoinSink : public SinkToStorage, WithContext
+{
+public:
+ SetOrJoinSink(
+ ContextPtr ctx, StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
+ const String & backup_path_, const String & backup_tmp_path_,
+ const String & backup_file_name_, bool persistent_);
+
+ String getName() const override { return "SetOrJoinSink"; }
+ void consume(Chunk chunk) override;
+ void onFinish() override;
+
+private:
+ StorageSetOrJoinBase & table;
+ StorageMetadataPtr metadata_snapshot;
+ String backup_path;
+ String backup_tmp_path;
+ String backup_file_name;
+ std::unique_ptr<WriteBufferFromFileBase> backup_buf;
+ CompressedWriteBuffer compressed_backup_buf;
+ NativeWriter backup_stream;
+ bool persistent;
+};
+
+
+SetOrJoinSink::SetOrJoinSink(
+ ContextPtr ctx,
+ StorageSetOrJoinBase & table_,
+ const StorageMetadataPtr & metadata_snapshot_,
+ const String & backup_path_,
+ const String & backup_tmp_path_,
+ const String & backup_file_name_,
+ bool persistent_)
+ : SinkToStorage(metadata_snapshot_->getSampleBlock())
+ , WithContext(ctx)
+ , table(table_)
+ , metadata_snapshot(metadata_snapshot_)
+ , backup_path(backup_path_)
+ , backup_tmp_path(backup_tmp_path_)
+ , backup_file_name(backup_file_name_)
+ , backup_buf(table_.disk->writeFile(fs::path(backup_tmp_path) / backup_file_name))
+ , compressed_backup_buf(*backup_buf)
+ , backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
+ , persistent(persistent_)
+{
+}
+
+void SetOrJoinSink::consume(Chunk chunk)
+{
+ Block block = getHeader().cloneWithColumns(chunk.detachColumns());
+
+ table.insertBlock(block, getContext());
+ if (persistent)
+ backup_stream.write(block);
+}
+
+void SetOrJoinSink::onFinish()
+{
+ table.finishInsert();
+ if (persistent)
+ {
+ backup_stream.flush();
+ compressed_backup_buf.next();
+ backup_buf->next();
+ backup_buf->finalize();
+
+ table.disk->replaceFile(fs::path(backup_tmp_path) / backup_file_name, fs::path(backup_path) / backup_file_name);
+ }
+}
+
+
+SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool /*async_insert*/)
+{
+ UInt64 id = ++increment;
+ return std::make_shared<SetOrJoinSink>(
+ context, *this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
+}
+
+
+StorageSetOrJoinBase::StorageSetOrJoinBase(
+ DiskPtr disk_,
+ const String & relative_path_,
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment,
+ bool persistent_)
+ : IStorage(table_id_), disk(disk_), persistent(persistent_)
+{
+ StorageInMemoryMetadata storage_metadata;
+ storage_metadata.setColumns(columns_);
+ storage_metadata.setConstraints(constraints_);
+ storage_metadata.setComment(comment);
+ setInMemoryMetadata(storage_metadata);
+
+
+ if (relative_path_.empty())
+ throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Join and Set storages require data path");
+
+ path = relative_path_;
+}
+
+
+StorageSet::StorageSet(
+ DiskPtr disk_,
+ const String & relative_path_,
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment,
+ bool persistent_)
+ : StorageSetOrJoinBase{disk_, relative_path_, table_id_, columns_, constraints_, comment, persistent_}
+ , set(std::make_shared<Set>(SizeLimits(), 0, true))
+{
+ Block header = getInMemoryMetadataPtr()->getSampleBlock();
+ set->setHeader(header.getColumnsWithTypeAndName());
+
+ restore();
+}
+
+
+void StorageSet::insertBlock(const Block & block, ContextPtr) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
+void StorageSet::finishInsert() { set->finishInsert(); }
+
+size_t StorageSet::getSize(ContextPtr) const { return set->getTotalRowCount(); }
+std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
+std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
+
+void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &)
+{
+ if (disk->exists(path))
+ disk->removeRecursive(path);
+ else
+ LOG_INFO(&Poco::Logger::get("StorageSet"), "Path {} is already removed from disk {}", path, disk->getName());
+
+ disk->createDirectories(path);
+ disk->createDirectories(fs::path(path) / "tmp/");
+
+ Block header = metadata_snapshot->getSampleBlock();
+
+ increment = 0;
+ set = std::make_shared<Set>(SizeLimits(), 0, true);
+ set->setHeader(header.getColumnsWithTypeAndName());
+}
+
+
+void StorageSetOrJoinBase::restore()
+{
+ if (!disk->exists(fs::path(path) / "tmp/"))
+ {
+ disk->createDirectories(fs::path(path) / "tmp/");
+ return;
+ }
+
+ static const char * file_suffix = ".bin";
+ static const auto file_suffix_size = strlen(".bin");
+
+ for (auto dir_it{disk->iterateDirectory(path)}; dir_it->isValid(); dir_it->next())
+ {
+ const auto & name = dir_it->name();
+ const auto & file_path = dir_it->path();
+
+ if (disk->isFile(file_path)
+ && endsWith(name, file_suffix)
+ && disk->getFileSize(file_path) > 0)
+ {
+ /// Calculate the maximum number of available files with a backup to add the following files with large numbers.
+ UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
+ if (file_num > increment)
+ increment = file_num;
+
+ restoreFromFile(dir_it->path());
+ }
+ }
+}
+
+
+void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
+{
+ ContextPtr ctx = nullptr;
+ auto backup_buf = disk->readFile(file_path);
+ CompressedReadBuffer compressed_backup_buf(*backup_buf);
+ NativeReader backup_stream(compressed_backup_buf, 0);
+
+ ProfileInfo info;
+ while (Block block = backup_stream.read())
+ {
+ info.update(block);
+ insertBlock(block, ctx);
+ }
+
+ finishInsert();
+
+ /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
+ LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
+ file_path, info.rows, ReadableSize(info.bytes), getSize(ctx));
+}
+
+
+void StorageSetOrJoinBase::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
+{
+ /// Rename directory with data.
+ disk->replaceFile(path, new_path_to_table_data);
+
+ path = new_path_to_table_data;
+ renameInMemory(new_table_id);
+}
+
+
+void registerStorageSet(StorageFactory & factory)
+{
+ factory.registerStorage("Set", [](const StorageFactory::Arguments & args)
+ {
+ if (!args.engine_args.empty())
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Engine {} doesn't support any arguments ({} given)",
+ args.engine_name, args.engine_args.size());
+
+ bool has_settings = args.storage_def->settings;
+ SetSettings set_settings;
+ if (has_settings)
+ set_settings.loadFromQuery(*args.storage_def);
+
+ DiskPtr disk = args.getContext()->getDisk(set_settings.disk);
+ return std::make_shared<StorageSet>(
+ disk, args.relative_data_path, args.table_id, args.columns, args.constraints, args.comment, set_settings.persistent);
+ }, StorageFactory::StorageFeatures{ .supports_settings = true, });
+}
+
+
+}