aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/ucompress/writer.cpp
blob: 40f8b121081c3ea967fb3353e02ae56d6f3d8962 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include "writer.h"
#include "common.h"

#include <library/cpp/blockcodecs/codecs.h>
#include <library/cpp/json/writer/json.h>

#include <util/generic/scope.h>
#include <util/generic/yexception.h>
#include <util/system/byteorder.h>


using namespace NUCompress;

TCodedOutput::TCodedOutput(IOutputStream* out, const NBlockCodecs::ICodec* c, size_t bufLen)
    : C_(c)
    , D_(bufLen)
    , S_(out)
{
    Y_ENSURE_EX(C_, TBadArgumentException() << "Null codec");
    Y_ENSURE_EX(S_, TBadArgumentException() << "Null output stream");
    D_.Resize(bufLen);
    Y_ENSURE_EX(C_->MaxCompressedLength(D_) <= MaxCompressedLen, TBadArgumentException() << "Too big buffer size: " << bufLen);
    D_.Clear();
}

TCodedOutput::~TCodedOutput() {
    try {
        Finish();
    } catch (...) {
    }
}

void TCodedOutput::DoWrite(const void* buf, size_t len) {
    Y_ENSURE(S_, "Stream finished already");
    const char* in = static_cast<const char*>(buf);

    while (len) {
        const size_t avail = D_.Avail();
        if (len < avail) {
            D_.Append(in, len);
            return;
        }

        D_.Append(in, avail);
        Y_ASSERT(!D_.Avail());
        in += avail;
        len -= avail;

        FlushImpl();
    }
}

void TCodedOutput::FlushImpl() {
    if (!HdrWritten) {
        NJsonWriter::TBuf jBuf;
        jBuf.BeginObject();
        jBuf.WriteKey("codec");
        jBuf.WriteString(C_->Name());
        jBuf.EndObject();

        TString jStr = jBuf.Str() + '\n';
        const TBlockLen lenToSave = HostToLittle(jStr.length());
        S_->Write(&lenToSave, sizeof(lenToSave));
        S_->Write(jStr.Detach(), jStr.length());
        HdrWritten = true;
    }

    O_.Reserve(C_->MaxCompressedLength(D_));
    const size_t oLen = C_->Compress(D_, O_.Data());
    Y_ASSERT(oLen <= MaxCompressedLen);

    const TBlockLen lenToSave = HostToLittle(oLen);
    S_->Write(&lenToSave, sizeof(lenToSave));
    S_->Write(O_.Data(), oLen);

    D_.Clear();
    O_.Clear();
}

void TCodedOutput::DoFlush() {
    if (S_ && D_) {
        FlushImpl();
    }
}

void TCodedOutput::DoFinish() {
    if (S_) {
        Y_DEFER {
            S_ = nullptr;
        };
        FlushImpl();
        // Write zero-length block as EOF marker.
        FlushImpl();
    }
}