summaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/arrow_next/cpp/src/arrow/json/chunker.cc
blob: 7830ae4d47eb12cde88b1729d16cdb1f60512195 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "contrib/libs/apache/arrow_next/cpp/src/arrow/json/chunker.h"

#include <algorithm>
#include <string_view>
#include <utility>
#include <vector>

#include "contrib/libs/apache/arrow_next/cpp/src/arrow/json/rapidjson_defs.h"
#include "rapidjson/reader.h"

#include "contrib/libs/apache/arrow_next/cpp/src/arrow/buffer.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/json/options.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h"

namespace arrow20 {

using std::string_view;

namespace json {

namespace rj = arrow20::rapidjson;

static size_t ConsumeWhitespace(string_view view) {
#ifdef RAPIDJSON_SIMD
  auto data = view.data();
  auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size());
  return nonws_begin - data;
#else
  auto ws_count = view.find_first_not_of(" \t\r\n");
  if (ws_count == string_view::npos) {
    return view.size();
  } else {
    return ws_count;
  }
#endif
}

/// RapidJson custom stream for reading JSON stored in multiple buffers
/// http://rapidjson.org/md_doc_stream.html#CustomStream
class MultiStringStream {
 public:
  using Ch = char;
  explicit MultiStringStream(std::vector<string_view> strings)
      : strings_(std::move(strings)) {
    std::reverse(strings_.begin(), strings_.end());
  }
  explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
    for (size_t i = 0; i < buffers.size(); ++i) {
      strings_[i] = string_view(*buffers[i]);
    }
    std::reverse(strings_.begin(), strings_.end());
  }
  char Peek() const {
    if (strings_.size() == 0) return '\0';
    return strings_.back()[0];
  }
  char Take() {
    if (strings_.size() == 0) return '\0';
    char taken = strings_.back()[0];
    if (strings_.back().size() == 1) {
      strings_.pop_back();
    } else {
      strings_.back() = strings_.back().substr(1);
    }
    ++index_;
    return taken;
  }
  size_t Tell() { return index_; }
  void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
  void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
  char* PutBegin() {
    ARROW_LOG(FATAL) << "not implemented";
    return nullptr;
  }
  size_t PutEnd(char*) {
    ARROW_LOG(FATAL) << "not implemented";
    return 0;
  }

 private:
  size_t index_ = 0;
  std::vector<string_view> strings_;
};

template <typename Stream>
static size_t ConsumeWholeObject(Stream&& stream) {
  static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
                                          rj::kParseStopWhenDoneFlag |
                                          rj::kParseNumbersAsStringsFlag;
  rj::BaseReaderHandler<rj::UTF8<>> handler;
  rj::Reader reader;
  // parse a single JSON object
  switch (reader.Parse<parse_flags>(stream, handler).Code()) {
    case rj::kParseErrorNone:
      return stream.Tell();
    case rj::kParseErrorDocumentEmpty:
      return 0;
    default:
      // rapidjson emitted an error, the most recent object was partial
      return string_view::npos;
  }
}

namespace {

// A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
// and uses actual JSON parsing to delimit them.
class ParsingBoundaryFinder : public BoundaryFinder {
 public:
  Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override {
    auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
    if (length == string_view::npos) {
      *out_pos = -1;
    } else if (ARROW_PREDICT_FALSE(length < partial.size())) {
      return Status::Invalid("JSON chunk error: invalid data at end of document");
    } else {
      DCHECK_LE(length, partial.size() + block.size());
      *out_pos = static_cast<int64_t>(length - partial.size());
    }
    return Status::OK();
  }

  Status FindLast(std::string_view block, int64_t* out_pos) override {
    const size_t block_length = block.size();
    size_t consumed_length = 0;
    while (consumed_length < block_length) {
      rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size());
      using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
      auto length = ConsumeWholeObject(InputStream(ms));
      if (length == string_view::npos || length == 0) {
        // found incomplete object or block is empty
        break;
      }
      consumed_length += length;
      block = block.substr(length);
    }
    if (consumed_length == 0) {
      *out_pos = -1;
    } else {
      consumed_length += ConsumeWhitespace(block);
      DCHECK_LE(consumed_length, block_length);
      *out_pos = static_cast<int64_t>(consumed_length);
    }
    return Status::OK();
  }

  Status FindNth(std::string_view partial, std::string_view block, int64_t count,
                 int64_t* out_pos, int64_t* num_found) override {
    return Status::NotImplemented("ParsingBoundaryFinder::FindNth");
  }
};

}  // namespace

std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
  std::shared_ptr<BoundaryFinder> delimiter;
  if (options.newlines_in_values) {
    delimiter = std::make_shared<ParsingBoundaryFinder>();
  } else {
    delimiter = MakeNewlineBoundaryFinder();
  }
  return std::make_unique<Chunker>(std::move(delimiter));
}

}  // namespace json
}  // namespace arrow20