aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Formats/TemporaryFileStreamLegacy.cpp
blob: e6651f0e83bc313fde12bb5436cfb3f5294ea96a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <Formats/TemporaryFileStreamLegacy.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Core/ProtocolDefines.h>


namespace DB
{

/// To read the data that was flushed into the temporary data file.
TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path)
    : file_in(path)
    , compressed_in(file_in)
    , block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}

TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path, const Block & header_)
    : file_in(path)
    , compressed_in(file_in)
    , block_in(std::make_unique<NativeReader>(compressed_in, header_, 0))
{}

/// Flush data from input stream into file for future reading
TemporaryFileStreamLegacy::Stat TemporaryFileStreamLegacy::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
    WriteBufferFromFile file_buf(path);
    CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
    NativeWriter output(compressed_buf, 0, header);

    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
    PullingPipelineExecutor executor(pipeline);

    Block block;
    while (executor.pull(block))
        output.write(block);

    compressed_buf.finalize();
    return Stat{compressed_buf.getCompressedBytes(), compressed_buf.getUncompressedBytes()};
}

}