aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/node_table_reader.h
blob: 4fe839eeb6b9074c2a5188d35d1c42f09392840a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#pragma once

#include "counting_raw_reader.h"

#include <yt/cpp/mapreduce/interface/io.h>

#include <library/cpp/yson/public.h>

#include <util/stream/input.h>
#include <util/generic/buffer.h>
#include <util/system/event.h>
#include <util/system/thread.h>

#include <atomic>

namespace NYT {

class TRawTableReader;
class TRowBuilder;

////////////////////////////////////////////////////////////////////////////////

struct TRowElement
{
    TNode Node;
    size_t Size = 0;

    void Reset()
    {
        Node = TNode();
        Size = 0;
    }
};

////////////////////////////////////////////////////////////////////////////////

class TNodeTableReader
    : public INodeReaderImpl
{
public:
    explicit TNodeTableReader(::TIntrusivePtr<TRawTableReader> input);
    ~TNodeTableReader() override;

    const TNode& GetRow() const override;
    void MoveRow(TNode* result) override;

    bool IsValid() const override;
    void Next() override;
    ui32 GetTableIndex() const override;
    ui32 GetRangeIndex() const override;
    ui64 GetRowIndex() const override;
    i64 GetTabletIndex() const override;
    void NextKey() override;
    TMaybe<size_t> GetReadByteCount() const override;
    bool IsEndOfStream() const override;
    bool IsRawReaderExhausted() const override;

private:
    void NextImpl();
    void OnStreamError(std::exception_ptr exception, TString error);
    void CheckValidity() const;
    void PrepareParsing();
    void ParseListFragmentItem();
    void ParseFirstListFragmentItem();

private:
    NDetail::TCountingRawTableReader Input_;

    bool Valid_ = true;
    bool Finished_ = false;
    ui32 TableIndex_ = 0;
    TMaybe<ui64> RowIndex_;
    TMaybe<ui32> RangeIndex_;
    TMaybe<i64> TabletIndex_;
    bool IsEndOfStream_ = false;
    bool AtStart_ = true;

    TMaybe<TRowElement> Row_;
    TMaybe<TRowElement> NextRow_;

    THolder<TRowBuilder> Builder_;
    THolder<::NYson::TYsonListParser> Parser_;

    std::exception_ptr Exception_;
    bool NeedParseFirst_ = true;
    bool IsLast_ = false;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT