summaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/arrow/cpp
diff options
context:
space:
mode:
authorheretic <[email protected]>2022-09-01 11:18:57 +0300
committerheretic <[email protected]>2022-09-01 11:18:57 +0300
commit8393683e8cb62468ccace14fa3379e3a4fbdde73 (patch)
tree4f2d32a77665019c9491d34dbe1cc5e605bb220c /contrib/libs/apache/arrow/cpp
parent836e587fc927c87149f8f0b2676d2587e6a79111 (diff)
add apache arrow python
Diffstat (limited to 'contrib/libs/apache/arrow/cpp')
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc761
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h532
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc448
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h113
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc780
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h132
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc271
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h130
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h49
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc73
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h56
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc940
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc469
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h68
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc186
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h35
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc323
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h94
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc83
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h49
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc82
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h48
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc28
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/options.h74
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc1099
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h101
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h43
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc227
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h72
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h26
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h89
31 files changed, 7481 insertions, 0 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc
new file mode 100644
index 00000000000..4f44e24ba6d
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc
@@ -0,0 +1,761 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sstream>
+#include <utility>
+
+#include "arrow/util/config.h"
+
+#include "arrow/filesystem/filesystem.h"
+#ifdef ARROW_HDFS
+#error #include "arrow/filesystem/hdfs.h"
+#endif
+#ifdef ARROW_S3
+#error #include "arrow/filesystem/s3fs.h"
+#endif
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/io/slow.h"
+#include "arrow/io/util_internal.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/parallel.h"
+#include "arrow/util/uri.h"
+#include "arrow/util/vector.h"
+#include "arrow/util/windows_fixup.h"
+
+namespace arrow {
+
+using internal::checked_pointer_cast;
+using internal::TaskHints;
+using internal::Uri;
+using io::internal::SubmitIO;
+
+namespace fs {
+
+using internal::ConcatAbstractPath;
+using internal::EnsureTrailingSlash;
+using internal::GetAbstractPathParent;
+using internal::kSep;
+using internal::RemoveLeadingSlash;
+using internal::RemoveTrailingSlash;
+using internal::ToSlashes;
+
+std::string ToString(FileType ftype) {
+ switch (ftype) {
+ case FileType::NotFound:
+ return "not-found";
+ case FileType::Unknown:
+ return "unknown";
+ case FileType::File:
+ return "file";
+ case FileType::Directory:
+ return "directory";
+ default:
+ ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype);
+ return "???";
+ }
+}
+
+// For googletest
+ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType ftype) {
+#define FILE_TYPE_CASE(value_name) \
+ case FileType::value_name: \
+ os << "FileType::" ARROW_STRINGIFY(value_name); \
+ break;
+
+ switch (ftype) {
+ FILE_TYPE_CASE(NotFound)
+ FILE_TYPE_CASE(Unknown)
+ FILE_TYPE_CASE(File)
+ FILE_TYPE_CASE(Directory)
+ default:
+ ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype);
+ }
+
+#undef FILE_TYPE_CASE
+ return os;
+}
+
+std::string FileInfo::base_name() const {
+ return internal::GetAbstractPathParent(path_).second;
+}
+
+std::string FileInfo::dir_name() const {
+ return internal::GetAbstractPathParent(path_).first;
+}
+
+// Debug helper
+std::string FileInfo::ToString() const {
+ std::stringstream os;
+ os << *this;
+ return os.str();
+}
+
+std::ostream& operator<<(std::ostream& os, const FileInfo& info) {
+ return os << "FileInfo(" << info.type() << ", " << info.path() << ")";
+}
+
+std::string FileInfo::extension() const {
+ return internal::GetAbstractPathExtension(path_);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// FileSystem default method implementations
+
+FileSystem::~FileSystem() {}
+
+Result<std::string> FileSystem::NormalizePath(std::string path) { return path; }
+
+Result<std::vector<FileInfo>> FileSystem::GetFileInfo(
+ const std::vector<std::string>& paths) {
+ std::vector<FileInfo> res;
+ res.reserve(paths.size());
+ for (const auto& path : paths) {
+ ARROW_ASSIGN_OR_RAISE(FileInfo info, GetFileInfo(path));
+ res.push_back(std::move(info));
+ }
+ return res;
+}
+
+namespace {
+
+template <typename DeferredFunc>
+auto FileSystemDefer(FileSystem* fs, bool synchronous, DeferredFunc&& func)
+ -> decltype(DeferNotOk(
+ fs->io_context().executor()->Submit(func, std::shared_ptr<FileSystem>{}))) {
+ auto self = fs->shared_from_this();
+ if (synchronous) {
+ return std::forward<DeferredFunc>(func)(std::move(self));
+ }
+ return DeferNotOk(io::internal::SubmitIO(
+ fs->io_context(), std::forward<DeferredFunc>(func), std::move(self)));
+}
+
+} // namespace
+
+Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(
+ const std::vector<std::string>& paths) {
+ return FileSystemDefer(
+ this, default_async_is_sync_,
+ [paths](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(paths); });
+}
+
+FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto fut = FileSystemDefer(
+ this, default_async_is_sync_,
+ [select](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(select); });
+ return MakeSingleFutureGenerator(std::move(fut));
+}
+
+Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
+ Status st = Status::OK();
+ for (const auto& path : paths) {
+ st &= DeleteFile(path);
+ }
+ return st;
+}
+
+namespace {
+
+Status ValidateInputFileInfo(const FileInfo& info) {
+ if (info.type() == FileType::NotFound) {
+ return internal::PathNotFound(info.path());
+ }
+ if (info.type() != FileType::File && info.type() != FileType::Unknown) {
+ return internal::NotAFile(info.path());
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+Result<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStream(
+ const FileInfo& info) {
+ RETURN_NOT_OK(ValidateInputFileInfo(info));
+ return OpenInputStream(info.path());
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFile(
+ const FileInfo& info) {
+ RETURN_NOT_OK(ValidateInputFileInfo(info));
+ return OpenInputFile(info.path());
+}
+
+Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync(
+ const std::string& path) {
+ return FileSystemDefer(
+ this, default_async_is_sync_,
+ [path](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(path); });
+}
+
+Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync(
+ const FileInfo& info) {
+ RETURN_NOT_OK(ValidateInputFileInfo(info));
+ return FileSystemDefer(
+ this, default_async_is_sync_,
+ [info](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(info); });
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync(
+ const std::string& path) {
+ return FileSystemDefer(
+ this, default_async_is_sync_,
+ [path](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(path); });
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync(
+ const FileInfo& info) {
+ RETURN_NOT_OK(ValidateInputFileInfo(info));
+ return FileSystemDefer(
+ this, default_async_is_sync_,
+ [info](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(info); });
+}
+
+Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenOutputStream(
+ const std::string& path) {
+ return OpenOutputStream(path, std::shared_ptr<const KeyValueMetadata>{});
+}
+
+Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenAppendStream(
+ const std::string& path) {
+ return OpenAppendStream(path, std::shared_ptr<const KeyValueMetadata>{});
+}
+
+//////////////////////////////////////////////////////////////////////////
+// SubTreeFileSystem implementation
+
+SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
+ std::shared_ptr<FileSystem> base_fs)
+ : FileSystem(base_fs->io_context()),
+ base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()),
+ base_fs_(base_fs) {}
+
+SubTreeFileSystem::~SubTreeFileSystem() {}
+
+Result<std::string> SubTreeFileSystem::NormalizeBasePath(
+ std::string base_path, const std::shared_ptr<FileSystem>& base_fs) {
+ ARROW_ASSIGN_OR_RAISE(base_path, base_fs->NormalizePath(std::move(base_path)));
+ return EnsureTrailingSlash(std::move(base_path));
+}
+
+bool SubTreeFileSystem::Equals(const FileSystem& other) const {
+ if (this == &other) {
+ return true;
+ }
+ if (other.type_name() != type_name()) {
+ return false;
+ }
+ const auto& subfs = ::arrow::internal::checked_cast<const SubTreeFileSystem&>(other);
+ return base_path_ == subfs.base_path_ && base_fs_->Equals(subfs.base_fs_);
+}
+
+std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
+ if (s.empty()) {
+ return base_path_;
+ } else {
+ return ConcatAbstractPath(base_path_, s);
+ }
+}
+
+Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const {
+ if (s->empty()) {
+ return Status::IOError("Empty path");
+ } else {
+ *s = ConcatAbstractPath(base_path_, *s);
+ return Status::OK();
+ }
+}
+
+Result<std::string> SubTreeFileSystem::StripBase(const std::string& s) const {
+ auto len = base_path_.length();
+ // Note base_path_ ends with a slash (if not empty)
+ if (s.length() >= len && s.substr(0, len) == base_path_) {
+ return s.substr(len);
+ } else {
+ return Status::UnknownError("Underlying filesystem returned path '", s,
+ "', which is not a subpath of '", base_path_, "'");
+ }
+}
+
+Status SubTreeFileSystem::FixInfo(FileInfo* info) const {
+ ARROW_ASSIGN_OR_RAISE(auto fixed_path, StripBase(info->path()));
+ info->set_path(std::move(fixed_path));
+ return Status::OK();
+}
+
+Result<std::string> SubTreeFileSystem::NormalizePath(std::string path) {
+ ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(PrependBase(path)));
+ return StripBase(std::move(normalized));
+}
+
+Result<FileInfo> SubTreeFileSystem::GetFileInfo(const std::string& path) {
+ ARROW_ASSIGN_OR_RAISE(FileInfo info, base_fs_->GetFileInfo(PrependBase(path)));
+ RETURN_NOT_OK(FixInfo(&info));
+ return info;
+}
+
+Result<std::vector<FileInfo>> SubTreeFileSystem::GetFileInfo(const FileSelector& select) {
+ auto selector = select;
+ selector.base_dir = PrependBase(selector.base_dir);
+ ARROW_ASSIGN_OR_RAISE(auto infos, base_fs_->GetFileInfo(selector));
+ for (auto& info : infos) {
+ RETURN_NOT_OK(FixInfo(&info));
+ }
+ return infos;
+}
+
+FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto selector = select;
+ selector.base_dir = PrependBase(selector.base_dir);
+ auto gen = base_fs_->GetFileInfoGenerator(selector);
+
+ auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this());
+
+ std::function<Result<std::vector<FileInfo>>(const std::vector<FileInfo>& infos)>
+ fix_infos = [self](std::vector<FileInfo> infos) -> Result<std::vector<FileInfo>> {
+ for (auto& info : infos) {
+ RETURN_NOT_OK(self->FixInfo(&info));
+ }
+ return infos;
+ };
+ return MakeMappedGenerator(gen, fix_infos);
+}
+
+Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->CreateDir(s, recursive);
+}
+
+Status SubTreeFileSystem::DeleteDir(const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->DeleteDir(s);
+}
+
+Status SubTreeFileSystem::DeleteDirContents(const std::string& path) {
+ if (internal::IsEmptyPath(path)) {
+ return internal::InvalidDeleteDirContents(path);
+ }
+ auto s = PrependBase(path);
+ return base_fs_->DeleteDirContents(s);
+}
+
+Status SubTreeFileSystem::DeleteRootDirContents() {
+ if (base_path_.empty()) {
+ return base_fs_->DeleteRootDirContents();
+ } else {
+ return base_fs_->DeleteDirContents(base_path_);
+ }
+}
+
+Status SubTreeFileSystem::DeleteFile(const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->DeleteFile(s);
+}
+
+Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) {
+ auto s = src;
+ auto d = dest;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+ return base_fs_->Move(s, d);
+}
+
+Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ auto s = src;
+ auto d = dest;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+ return base_fs_->CopyFile(s, d);
+}
+
+Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputStream(s);
+}
+
+Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputStream(new_info);
+}
+
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputStreamAsync(s);
+}
+
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputStreamAsync(new_info);
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputFile(s);
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputFile(new_info);
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputFileAsync(s);
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputFileAsync(new_info);
+}
+
+Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenOutputStream(s, metadata);
+}
+
+Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenAppendStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenAppendStream(s, metadata);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// SlowFileSystem implementation
+
+SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
+ std::shared_ptr<io::LatencyGenerator> latencies)
+ : FileSystem(base_fs->io_context()), base_fs_(base_fs), latencies_(latencies) {}
+
+SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
+ double average_latency)
+ : FileSystem(base_fs->io_context()),
+ base_fs_(base_fs),
+ latencies_(io::LatencyGenerator::Make(average_latency)) {}
+
+SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
+ double average_latency, int32_t seed)
+ : FileSystem(base_fs->io_context()),
+ base_fs_(base_fs),
+ latencies_(io::LatencyGenerator::Make(average_latency, seed)) {}
+
+bool SlowFileSystem::Equals(const FileSystem& other) const { return this == &other; }
+
+Result<FileInfo> SlowFileSystem::GetFileInfo(const std::string& path) {
+ latencies_->Sleep();
+ return base_fs_->GetFileInfo(path);
+}
+
+Result<std::vector<FileInfo>> SlowFileSystem::GetFileInfo(const FileSelector& selector) {
+ latencies_->Sleep();
+ return base_fs_->GetFileInfo(selector);
+}
+
+Status SlowFileSystem::CreateDir(const std::string& path, bool recursive) {
+ latencies_->Sleep();
+ return base_fs_->CreateDir(path, recursive);
+}
+
+Status SlowFileSystem::DeleteDir(const std::string& path) {
+ latencies_->Sleep();
+ return base_fs_->DeleteDir(path);
+}
+
+Status SlowFileSystem::DeleteDirContents(const std::string& path) {
+ latencies_->Sleep();
+ return base_fs_->DeleteDirContents(path);
+}
+
+Status SlowFileSystem::DeleteRootDirContents() {
+ latencies_->Sleep();
+ return base_fs_->DeleteRootDirContents();
+}
+
+Status SlowFileSystem::DeleteFile(const std::string& path) {
+ latencies_->Sleep();
+ return base_fs_->DeleteFile(path);
+}
+
+Status SlowFileSystem::Move(const std::string& src, const std::string& dest) {
+ latencies_->Sleep();
+ return base_fs_->Move(src, dest);
+}
+
+Status SlowFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ latencies_->Sleep();
+ return base_fs_->CopyFile(src, dest);
+}
+
+Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream(
+ const std::string& path) {
+ latencies_->Sleep();
+ ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(path));
+ return std::make_shared<io::SlowInputStream>(stream, latencies_);
+}
+
+Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream(
+ const FileInfo& info) {
+ latencies_->Sleep();
+ ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(info));
+ return std::make_shared<io::SlowInputStream>(stream, latencies_);
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile(
+ const std::string& path) {
+ latencies_->Sleep();
+ ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(path));
+ return std::make_shared<io::SlowRandomAccessFile>(file, latencies_);
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile(
+ const FileInfo& info) {
+ latencies_->Sleep();
+ ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(info));
+ return std::make_shared<io::SlowRandomAccessFile>(file, latencies_);
+}
+
+Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenOutputStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ latencies_->Sleep();
+ // XXX Should we have a SlowOutputStream that waits on Flush() and Close()?
+ return base_fs_->OpenOutputStream(path, metadata);
+}
+
+Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ latencies_->Sleep();
+ return base_fs_->OpenAppendStream(path, metadata);
+}
+
+Status CopyFiles(const std::vector<FileLocator>& sources,
+ const std::vector<FileLocator>& destinations,
+ const io::IOContext& io_context, int64_t chunk_size, bool use_threads) {
+ if (sources.size() != destinations.size()) {
+ return Status::Invalid("Trying to copy ", sources.size(), " files into ",
+ destinations.size(), " paths.");
+ }
+
+ auto copy_one_file = [&](int i) {
+ if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
+ return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto source,
+ sources[i].filesystem->OpenInputStream(sources[i].path));
+ ARROW_ASSIGN_OR_RAISE(const auto metadata, source->ReadMetadata());
+
+ ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream(
+ destinations[i].path, metadata));
+ RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context));
+ return destination->Close();
+ };
+
+ return ::arrow::internal::OptionalParallelFor(
+ use_threads, static_cast<int>(sources.size()), std::move(copy_one_file),
+ io_context.executor());
+}
+
+Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,
+ const FileSelector& source_sel,
+ const std::shared_ptr<FileSystem>& destination_fs,
+ const std::string& destination_base_dir, const io::IOContext& io_context,
+ int64_t chunk_size, bool use_threads) {
+ ARROW_ASSIGN_OR_RAISE(auto source_infos, source_fs->GetFileInfo(source_sel));
+ if (source_infos.empty()) {
+ return Status::OK();
+ }
+
+ std::vector<FileLocator> sources, destinations;
+ std::vector<std::string> dirs;
+
+ for (const FileInfo& source_info : source_infos) {
+ auto relative = internal::RemoveAncestor(source_sel.base_dir, source_info.path());
+ if (!relative.has_value()) {
+ return Status::Invalid("GetFileInfo() yielded path '", source_info.path(),
+ "', which is outside base dir '", source_sel.base_dir, "'");
+ }
+
+ auto destination_path =
+ internal::ConcatAbstractPath(destination_base_dir, relative->to_string());
+
+ if (source_info.IsDirectory()) {
+ dirs.push_back(destination_path);
+ } else if (source_info.IsFile()) {
+ sources.push_back({source_fs, source_info.path()});
+ destinations.push_back({destination_fs, destination_path});
+ }
+ }
+
+ auto create_one_dir = [&](int i) { return destination_fs->CreateDir(dirs[i]); };
+
+ dirs = internal::MinimalCreateDirSet(std::move(dirs));
+ RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
+ use_threads, static_cast<int>(dirs.size()), std::move(create_one_dir),
+ io_context.executor()));
+
+ return CopyFiles(sources, destinations, io_context, chunk_size, use_threads);
+}
+
+namespace {
+
+Result<Uri> ParseFileSystemUri(const std::string& uri_string) {
+ Uri uri;
+ auto status = uri.Parse(uri_string);
+ if (!status.ok()) {
+#ifdef _WIN32
+ // Could be a "file:..." URI with backslashes instead of regular slashes.
+ RETURN_NOT_OK(uri.Parse(ToSlashes(uri_string)));
+ if (uri.scheme() != "file") {
+ return status;
+ }
+#else
+ return status;
+#endif
+ }
+ return std::move(uri);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
+ const std::string& uri_string,
+ const io::IOContext& io_context,
+ std::string* out_path) {
+ const auto scheme = uri.scheme();
+
+ if (scheme == "file") {
+ std::string path;
+ ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
+ if (out_path != nullptr) {
+ *out_path = path;
+ }
+ return std::make_shared<LocalFileSystem>(options, io_context);
+ }
+ if (scheme == "hdfs" || scheme == "viewfs") {
+#ifdef ARROW_HDFS
+ ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri));
+ if (out_path != nullptr) {
+ *out_path = uri.path();
+ }
+ ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context));
+ return hdfs;
+#else
+ return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support");
+#endif
+ }
+ if (scheme == "s3") {
+#ifdef ARROW_S3
+ RETURN_NOT_OK(EnsureS3Initialized());
+ ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
+ ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context));
+ return s3fs;
+#else
+ return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support");
+#endif
+ }
+
+ if (scheme == "mock") {
+ // MockFileSystem does not have an absolute / relative path distinction,
+ // normalize path by removing leading slash.
+ if (out_path != nullptr) {
+ *out_path = std::string(RemoveLeadingSlash(uri.path()));
+ }
+ return std::make_shared<internal::MockFileSystem>(internal::CurrentTimePoint(),
+ io_context);
+ }
+
+ return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string);
+}
+
+} // namespace
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
+ std::string* out_path) {
+ return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
+ const io::IOContext& io_context,
+ std::string* out_path) {
+ ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string));
+ return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(const std::string& uri_string,
+ std::string* out_path) {
+ return FileSystemFromUriOrPath(uri_string, io::default_io_context(), out_path);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
+ const std::string& uri_string, const io::IOContext& io_context,
+ std::string* out_path) {
+ if (internal::DetectAbsolutePath(uri_string)) {
+ // Normalize path separators
+ if (out_path != nullptr) {
+ *out_path = ToSlashes(uri_string);
+ }
+ return std::make_shared<LocalFileSystem>();
+ }
+ return FileSystemFromUri(uri_string, io_context, out_path);
+}
+
+Status FileSystemFromUri(const std::string& uri, std::shared_ptr<FileSystem>* out_fs,
+ std::string* out_path) {
+ return FileSystemFromUri(uri, out_path).Value(out_fs);
+}
+
+Status Initialize(const FileSystemGlobalOptions& options) {
+ internal::global_options = options;
+ return Status::OK();
+}
+
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h
new file mode 100644
index 00000000000..c739471c725
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/filesystem/type_fwd.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/compare.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/type_fwd.h"
+#include "arrow/util/visibility.h"
+#include "arrow/util/windows_fixup.h"
+
+namespace arrow {
+namespace fs {
+
+// A system clock time point expressed as a 64-bit (or more) number of
+// nanoseconds since the epoch.
+using TimePoint =
+ std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>;
+
+ARROW_EXPORT std::string ToString(FileType);
+
+ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType);
+
+static const int64_t kNoSize = -1;
+static const TimePoint kNoTime = TimePoint(TimePoint::duration(-1));
+
+/// \brief FileSystem entry info
+struct ARROW_EXPORT FileInfo : public util::EqualityComparable<FileInfo> {
+ FileInfo() = default;
+ FileInfo(FileInfo&&) = default;
+ FileInfo& operator=(FileInfo&&) = default;
+ FileInfo(const FileInfo&) = default;
+ FileInfo& operator=(const FileInfo&) = default;
+
+ explicit FileInfo(std::string path, FileType type = FileType::Unknown)
+ : path_(std::move(path)), type_(type) {}
+
+ /// The file type
+ FileType type() const { return type_; }
+ void set_type(FileType type) { type_ = type; }
+
+ /// The full file path in the filesystem
+ const std::string& path() const { return path_; }
+ void set_path(std::string path) { path_ = std::move(path); }
+
+ /// The file base name (component after the last directory separator)
+ std::string base_name() const;
+
+ // The directory base name (component before the file base name).
+ std::string dir_name() const;
+
+ /// The size in bytes, if available
+ ///
+ /// Only regular files are guaranteed to have a size.
+ int64_t size() const { return size_; }
+ void set_size(int64_t size) { size_ = size; }
+
+ /// The file extension (excluding the dot)
+ std::string extension() const;
+
+ /// The time of last modification, if available
+ TimePoint mtime() const { return mtime_; }
+ void set_mtime(TimePoint mtime) { mtime_ = mtime; }
+
+ bool IsFile() const { return type_ == FileType::File; }
+ bool IsDirectory() const { return type_ == FileType::Directory; }
+
+ bool Equals(const FileInfo& other) const {
+ return type() == other.type() && path() == other.path() && size() == other.size() &&
+ mtime() == other.mtime();
+ }
+
+ std::string ToString() const;
+
+ /// Function object implementing less-than comparison and hashing by
+ /// path, to support sorting infos, using them as keys, and other
+ /// interactions with the STL.
+ struct ByPath {
+ bool operator()(const FileInfo& l, const FileInfo& r) const {
+ return l.path() < r.path();
+ }
+
+ size_t operator()(const FileInfo& i) const {
+ return std::hash<std::string>{}(i.path());
+ }
+ };
+
+ protected:
+ std::string path_;
+ FileType type_ = FileType::Unknown;
+ int64_t size_ = kNoSize;
+ TimePoint mtime_ = kNoTime;
+};
+
+ARROW_EXPORT std::ostream& operator<<(std::ostream& os, const FileInfo&);
+
+/// \brief File selector for filesystem APIs
+struct ARROW_EXPORT FileSelector {
+ /// The directory in which to select files.
+ /// If the path exists but doesn't point to a directory, this should be an error.
+ std::string base_dir;
+ /// The behavior if `base_dir` isn't found in the filesystem. If false,
+ /// an error is returned. If true, an empty selection is returned.
+ bool allow_not_found;
+ /// Whether to recurse into subdirectories.
+ bool recursive;
+ /// The maximum number of subdirectories to recurse into.
+ int32_t max_recursion;
+
+ FileSelector() : allow_not_found(false), recursive(false), max_recursion(INT32_MAX) {}
+};
+
+/// \brief FileSystem, path pair
+struct ARROW_EXPORT FileLocator {
+ std::shared_ptr<FileSystem> filesystem;
+ std::string path;
+};
+
+using FileInfoVector = std::vector<FileInfo>;
+using FileInfoGenerator = std::function<Future<FileInfoVector>()>;
+
+} // namespace fs
+
+template <>
+struct IterationTraits<fs::FileInfoVector> {
+ static fs::FileInfoVector End() { return {}; }
+ static bool IsEnd(const fs::FileInfoVector& val) { return val.empty(); }
+};
+
+namespace fs {
+
+/// \brief Abstract file system API
+class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
+ public:
+ virtual ~FileSystem();
+
+ virtual std::string type_name() const = 0;
+
+ /// EXPERIMENTAL: The IOContext associated with this filesystem.
+ const io::IOContext& io_context() const { return io_context_; }
+
+ /// Normalize path for the given filesystem
+ ///
+ /// The default implementation of this method is a no-op, but subclasses
+ /// may allow normalizing irregular path forms (such as Windows local paths).
+ virtual Result<std::string> NormalizePath(std::string path);
+
+ virtual bool Equals(const FileSystem& other) const = 0;
+
+ virtual bool Equals(const std::shared_ptr<FileSystem>& other) const {
+ return Equals(*other);
+ }
+
+ /// Get info for the given target.
+ ///
+ /// Any symlink is automatically dereferenced, recursively.
+ /// A nonexistent or unreachable file returns an Ok status and
+ /// has a FileType of value NotFound. An error status indicates
+ /// a truly exceptional condition (low-level I/O error, etc.).
+ virtual Result<FileInfo> GetFileInfo(const std::string& path) = 0;
+ /// Same, for many targets at once.
+ virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths);
+ /// Same, according to a selector.
+ ///
+ /// The selector's base directory will not be part of the results, even if
+ /// it exists.
+ /// If it doesn't exist, see `FileSelector::allow_not_found`.
+ virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select) = 0;
+
+ /// EXPERIMENTAL: async version of GetFileInfo
+ virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths);
+
+ /// EXPERIMENTAL: streaming async version of GetFileInfo
+ ///
+ /// The returned generator is not async-reentrant, i.e. you need to wait for
+ /// the returned future to complete before calling the generator again.
+ virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select);
+
+ /// Create a directory and subdirectories.
+ ///
+ /// This function succeeds if the directory already exists.
+ virtual Status CreateDir(const std::string& path, bool recursive = true) = 0;
+
+ /// Delete a directory and its contents, recursively.
+ virtual Status DeleteDir(const std::string& path) = 0;
+
+ /// Delete a directory's contents, recursively.
+ ///
+ /// Like DeleteDir, but doesn't delete the directory itself.
+ /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents.
+ virtual Status DeleteDirContents(const std::string& path) = 0;
+
+ /// EXPERIMENTAL: Delete the root directory's contents, recursively.
+ ///
+ /// Implementations may decide to raise an error if this operation is
+ /// too dangerous.
+ // NOTE: may decide to remove this if it's deemed not useful
+ virtual Status DeleteRootDirContents() = 0;
+
+ /// Delete a file.
+ virtual Status DeleteFile(const std::string& path) = 0;
+ /// Delete many files.
+ ///
+ /// The default implementation issues individual delete operations in sequence.
+ virtual Status DeleteFiles(const std::vector<std::string>& paths);
+
+ /// Move / rename a file or directory.
+ ///
+ /// If the destination exists:
+ /// - if it is a non-empty directory, an error is returned
+ /// - otherwise, if it has the same type as the source, it is replaced
+ /// - otherwise, behavior is unspecified (implementation-dependent).
+ virtual Status Move(const std::string& src, const std::string& dest) = 0;
+
+ /// Copy a file.
+ ///
+ /// If the destination exists and is a directory, an error is returned.
+ /// Otherwise, it is replaced.
+ virtual Status CopyFile(const std::string& src, const std::string& dest) = 0;
+
+ /// Open an input stream for sequential reading.
+ virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(
+ const std::string& path) = 0;
+ /// Open an input stream for sequential reading.
+ ///
+ /// This override assumes the given FileInfo validly represents the file's
+ /// characteristics, and may optimize access depending on them (for example
+ /// avoid querying the file size or its existence).
+ virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info);
+
+ /// Open an input file for random access reading.
+ virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const std::string& path) = 0;
+ /// Open an input file for random access reading.
+ ///
+ /// This override assumes the given FileInfo validly represents the file's
+ /// characteristics, and may optimize access depending on them (for example
+ /// avoid querying the file size or its existence).
+ virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const FileInfo& info);
+
+ /// EXPERIMENTAL: async version of OpenInputStream
+ virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const std::string& path);
+ /// EXPERIMENTAL: async version of OpenInputStream
+ virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const FileInfo& info);
+
+ /// EXPERIMENTAL: async version of OpenInputFile
+ virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const std::string& path);
+ /// EXPERIMENTAL: async version of OpenInputFile
+ virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const FileInfo& info);
+
+ /// Open an output stream for sequential writing.
+ ///
+ /// If the target already exists, existing data is truncated.
+ virtual Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path);
+
+ /// Open an output stream for appending.
+ ///
+ /// If the target doesn't exist, a new empty file is created.
+ virtual Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
+ Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path);
+
+ protected:
+ explicit FileSystem(const io::IOContext& io_context = io::default_io_context())
+ : io_context_(io_context) {}
+
+ io::IOContext io_context_;
+ // Whether metadata operations (such as GetFileInfo or OpenInputStream)
+ // are cheap enough that the default async variants don't bother with
+ // a thread pool.
+ bool default_async_is_sync_ = true;
+};
+
+/// \brief A FileSystem implementation that delegates to another
+/// implementation after prepending a fixed base path.
+///
+/// This is useful to expose a logical view of a subtree of a filesystem,
+/// for example a directory in a LocalFileSystem.
+/// This works on abstract paths, i.e. paths using forward slashes and
+/// and a single root "/". Windows paths are not guaranteed to work.
+/// This makes no security guarantee. For example, symlinks may allow to
+/// "escape" the subtree and access other parts of the underlying filesystem.
+class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
+ public:
+ // This constructor may abort if base_path is invalid.
+ explicit SubTreeFileSystem(const std::string& base_path,
+ std::shared_ptr<FileSystem> base_fs);
+ ~SubTreeFileSystem() override;
+
+ std::string type_name() const override { return "subtree"; }
+ std::string base_path() const { return base_path_; }
+ std::shared_ptr<FileSystem> base_fs() const { return base_fs_; }
+
+ Result<std::string> NormalizePath(std::string path) override;
+
+ bool Equals(const FileSystem& other) const override;
+
+ /// \cond FALSE
+ using FileSystem::GetFileInfo;
+ /// \endcond
+ Result<FileInfo> GetFileInfo(const std::string& path) override;
+ Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
+
+ FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+ Status DeleteDirContents(const std::string& path) override;
+ Status DeleteRootDirContents() override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const FileInfo& info) override;
+
+ Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const std::string& path) override;
+ Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const FileInfo& info) override;
+ Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const std::string& path) override;
+ Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const FileInfo& info) override;
+
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+
+ protected:
+ SubTreeFileSystem() {}
+
+ const std::string base_path_;
+ std::shared_ptr<FileSystem> base_fs_;
+
+ std::string PrependBase(const std::string& s) const;
+ Status PrependBaseNonEmpty(std::string* s) const;
+ Result<std::string> StripBase(const std::string& s) const;
+ Status FixInfo(FileInfo* info) const;
+
+ static Result<std::string> NormalizeBasePath(
+ std::string base_path, const std::shared_ptr<FileSystem>& base_fs);
+};
+
+/// \brief A FileSystem implementation that delegates to another
+/// implementation but inserts latencies at various points.
+class ARROW_EXPORT SlowFileSystem : public FileSystem {
+ public:
+ SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
+ std::shared_ptr<io::LatencyGenerator> latencies);
+ SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency);
+ SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency,
+ int32_t seed);
+
+ std::string type_name() const override { return "slow"; }
+ bool Equals(const FileSystem& other) const override;
+
+ using FileSystem::GetFileInfo;
+ Result<FileInfo> GetFileInfo(const std::string& path) override;
+ Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+ Status DeleteDirContents(const std::string& path) override;
+ Status DeleteRootDirContents() override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const FileInfo& info) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+
+ protected:
+ std::shared_ptr<FileSystem> base_fs_;
+ std::shared_ptr<io::LatencyGenerator> latencies_;
+};
+
+/// \defgroup filesystem-factories Functions for creating FileSystem instances
+///
+/// @{
+
+/// \brief Create a new FileSystem by URI
+///
+/// Recognized schemes are "file", "mock", "hdfs" and "s3fs".
+///
+/// \param[in] uri a URI-based path, ex: file:///some/local/path
+/// \param[out] out_path (optional) Path inside the filesystem.
+/// \return out_fs FileSystem instance.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
+ std::string* out_path = NULLPTR);
+
+/// \brief Create a new FileSystem by URI with a custom IO context
+///
+/// Recognized schemes are "file", "mock", "hdfs" and "s3fs".
+///
+/// \param[in] uri a URI-based path, ex: file:///some/local/path
+/// \param[in] io_context an IOContext which will be associated with the filesystem
+/// \param[out] out_path (optional) Path inside the filesystem.
+/// \return out_fs FileSystem instance.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
+ const io::IOContext& io_context,
+ std::string* out_path = NULLPTR);
+
+/// \brief Create a new FileSystem by URI
+///
+/// Same as FileSystemFromUri, but in addition also recognize non-URIs
+/// and treat them as local filesystem paths. Only absolute local filesystem
+/// paths are allowed.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
+ const std::string& uri, std::string* out_path = NULLPTR);
+
+/// \brief Create a new FileSystem by URI with a custom IO context
+///
+/// Same as FileSystemFromUri, but in addition also recognize non-URIs
+/// and treat them as local filesystem paths. Only absolute local filesystem
+/// paths are allowed.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
+ const std::string& uri, const io::IOContext& io_context,
+ std::string* out_path = NULLPTR);
+
+/// @}
+
+/// \brief Copy files, including from one FileSystem to another
+///
+/// If a source and destination are resident in the same FileSystem FileSystem::CopyFile
+/// will be used, otherwise the file will be opened as a stream in both FileSystems and
+/// chunks copied from the source to the destination. No directories will be created.
+ARROW_EXPORT
+Status CopyFiles(const std::vector<FileLocator>& sources,
+ const std::vector<FileLocator>& destinations,
+ const io::IOContext& io_context = io::default_io_context(),
+ int64_t chunk_size = 1024 * 1024, bool use_threads = true);
+
+/// \brief Copy selected files, including from one FileSystem to another
+///
+/// Directories will be created under the destination base directory as needed.
+ARROW_EXPORT
+Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,
+ const FileSelector& source_sel,
+ const std::shared_ptr<FileSystem>& destination_fs,
+ const std::string& destination_base_dir,
+ const io::IOContext& io_context = io::default_io_context(),
+ int64_t chunk_size = 1024 * 1024, bool use_threads = true);
+
+struct FileSystemGlobalOptions {
+ /// Path to a single PEM file holding all TLS CA certificates
+ ///
+ /// If empty, the underlying TLS library's defaults will be used.
+ std::string tls_ca_file_path;
+
+ /// Path to a directory holding TLS CA certificates in individual PEM files
+ /// named along the OpenSSL "hashed" format.
+ ///
+ /// If empty, the underlying TLS library's defaults will be used.
+ std::string tls_ca_dir_path;
+};
+
+/// Experimental: optional global initialization routine
+///
+/// This is for environments (such as manylinux) where the path
+/// to TLS CA certificates needs to be configured at runtime.
+ARROW_EXPORT
+Status Initialize(const FileSystemGlobalOptions& options);
+
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc
new file mode 100644
index 00000000000..775fd746aa6
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstring>
+#include <sstream>
+#include <utility>
+
+#ifdef _WIN32
+#include "arrow/util/windows_compatibility.h"
+#else
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#endif
+
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/io/file.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/uri.h"
+#include "arrow/util/windows_fixup.h"
+
+namespace arrow {
+namespace fs {
+
+using ::arrow::internal::IOErrorFromErrno;
+#ifdef _WIN32
+using ::arrow::internal::IOErrorFromWinError;
+#endif
+using ::arrow::internal::NativePathString;
+using ::arrow::internal::PlatformFilename;
+
+namespace internal {
+
+#ifdef _WIN32
+static bool IsDriveLetter(char c) {
+ // Can't use locale-dependent functions from the C/C++ stdlib
+ return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z');
+}
+#endif
+
+bool DetectAbsolutePath(const std::string& s) {
+ // Is it a /-prefixed local path?
+ if (s.length() >= 1 && s[0] == '/') {
+ return true;
+ }
+#ifdef _WIN32
+ // Is it a \-prefixed local path?
+ if (s.length() >= 1 && s[0] == '\\') {
+ return true;
+ }
+ // Does it start with a drive letter in addition to being /- or \-prefixed,
+ // e.g. "C:\..."?
+ if (s.length() >= 3 && s[1] == ':' && (s[2] == '/' || s[2] == '\\') &&
+ IsDriveLetter(s[0])) {
+ return true;
+ }
+#endif
+ return false;
+}
+
+} // namespace internal
+
+namespace {
+
+#ifdef _WIN32
+
+std::string NativeToString(const NativePathString& ns) {
+ PlatformFilename fn(ns);
+ return fn.ToString();
+}
+
+TimePoint ToTimePoint(FILETIME ft) {
+ // Hundreds of nanoseconds between January 1, 1601 (UTC) and the Unix epoch.
+ static constexpr int64_t kFileTimeEpoch = 11644473600LL * 10000000;
+
+ int64_t hundreds = (static_cast<int64_t>(ft.dwHighDateTime) << 32) + ft.dwLowDateTime -
+ kFileTimeEpoch; // hundreds of ns since Unix epoch
+ std::chrono::nanoseconds ns_count(100 * hundreds);
+ return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileInfo FileInformationToFileInfo(const BY_HANDLE_FILE_INFORMATION& information) {
+ FileInfo info;
+ if (information.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
+ info.set_type(FileType::Directory);
+ info.set_size(kNoSize);
+ } else {
+ // Regular file
+ info.set_type(FileType::File);
+ info.set_size((static_cast<int64_t>(information.nFileSizeHigh) << 32) +
+ information.nFileSizeLow);
+ }
+ info.set_mtime(ToTimePoint(information.ftLastWriteTime));
+ return info;
+}
+
+Result<FileInfo> StatFile(const std::wstring& path) {
+ HANDLE h;
+ std::string bytes_path = NativeToString(path);
+ FileInfo info;
+
+ /* Inspired by CPython, see Modules/posixmodule.c */
+ h = CreateFileW(path.c_str(), FILE_READ_ATTRIBUTES, /* desired access */
+ 0, /* share mode */
+ NULL, /* security attributes */
+ OPEN_EXISTING,
+ /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
+ FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS, NULL);
+
+ if (h == INVALID_HANDLE_VALUE) {
+ DWORD err = GetLastError();
+ if (err == ERROR_FILE_NOT_FOUND || err == ERROR_PATH_NOT_FOUND) {
+ info.set_path(bytes_path);
+ info.set_type(FileType::NotFound);
+ info.set_mtime(kNoTime);
+ info.set_size(kNoSize);
+ return info;
+ } else {
+ return IOErrorFromWinError(GetLastError(), "Failed querying information for path '",
+ bytes_path, "'");
+ }
+ }
+ BY_HANDLE_FILE_INFORMATION information;
+ if (!GetFileInformationByHandle(h, &information)) {
+ CloseHandle(h);
+ return IOErrorFromWinError(GetLastError(), "Failed querying information for path '",
+ bytes_path, "'");
+ }
+ CloseHandle(h);
+ info = FileInformationToFileInfo(information);
+ info.set_path(bytes_path);
+ return info;
+}
+
+#else // POSIX systems
+
+TimePoint ToTimePoint(const struct timespec& s) {
+ std::chrono::nanoseconds ns_count(static_cast<int64_t>(s.tv_sec) * 1000000000 +
+ static_cast<int64_t>(s.tv_nsec));
+ return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileInfo StatToFileInfo(const struct stat& s) {
+ FileInfo info;
+ if (S_ISREG(s.st_mode)) {
+ info.set_type(FileType::File);
+ info.set_size(static_cast<int64_t>(s.st_size));
+ } else if (S_ISDIR(s.st_mode)) {
+ info.set_type(FileType::Directory);
+ info.set_size(kNoSize);
+ } else {
+ info.set_type(FileType::Unknown);
+ info.set_size(kNoSize);
+ }
+#ifdef __APPLE__
+ // macOS doesn't use the POSIX-compliant spelling
+ info.set_mtime(ToTimePoint(s.st_mtimespec));
+#else
+ info.set_mtime(ToTimePoint(s.st_mtim));
+#endif
+ return info;
+}
+
+Result<FileInfo> StatFile(const std::string& path) {
+ FileInfo info;
+ struct stat s;
+ int r = stat(path.c_str(), &s);
+ if (r == -1) {
+ if (errno == ENOENT || errno == ENOTDIR || errno == ELOOP) {
+ info.set_type(FileType::NotFound);
+ info.set_mtime(kNoTime);
+ info.set_size(kNoSize);
+ } else {
+ return IOErrorFromErrno(errno, "Failed stat()ing path '", path, "'");
+ }
+ } else {
+ info = StatToFileInfo(s);
+ }
+ info.set_path(path);
+ return info;
+}
+
+#endif
+
+Status StatSelector(const PlatformFilename& dir_fn, const FileSelector& select,
+ int32_t nesting_depth, std::vector<FileInfo>* out) {
+ auto result = ListDir(dir_fn);
+ if (!result.ok()) {
+ auto status = result.status();
+ if (select.allow_not_found && status.IsIOError()) {
+ ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn));
+ if (!exists) {
+ return Status::OK();
+ }
+ }
+ return status;
+ }
+
+ for (const auto& child_fn : *result) {
+ PlatformFilename full_fn = dir_fn.Join(child_fn);
+ ARROW_ASSIGN_OR_RAISE(FileInfo info, StatFile(full_fn.ToNative()));
+ if (info.type() != FileType::NotFound) {
+ out->push_back(std::move(info));
+ }
+ if (nesting_depth < select.max_recursion && select.recursive &&
+ info.type() == FileType::Directory) {
+ RETURN_NOT_OK(StatSelector(full_fn, select, nesting_depth + 1, out));
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+LocalFileSystemOptions LocalFileSystemOptions::Defaults() {
+ return LocalFileSystemOptions();
+}
+
+bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const {
+ return use_mmap == other.use_mmap;
+}
+
+Result<LocalFileSystemOptions> LocalFileSystemOptions::FromUri(
+ const ::arrow::internal::Uri& uri, std::string* out_path) {
+ if (!uri.username().empty() || !uri.password().empty()) {
+ return Status::Invalid("Unsupported username or password in local URI: '",
+ uri.ToString(), "'");
+ }
+ std::string path;
+ const auto host = uri.host();
+ if (!host.empty()) {
+#ifdef _WIN32
+ std::stringstream ss;
+ ss << "//" << host << "/" << internal::RemoveLeadingSlash(uri.path());
+ *out_path = ss.str();
+#else
+ return Status::Invalid("Unsupported hostname in non-Windows local URI: '",
+ uri.ToString(), "'");
+#endif
+ } else {
+ *out_path = uri.path();
+ }
+
+ // TODO handle use_mmap option
+ return LocalFileSystemOptions();
+}
+
+LocalFileSystem::LocalFileSystem(const io::IOContext& io_context)
+ : FileSystem(io_context), options_(LocalFileSystemOptions::Defaults()) {}
+
+LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options,
+ const io::IOContext& io_context)
+ : FileSystem(io_context), options_(options) {}
+
+LocalFileSystem::~LocalFileSystem() {}
+
+Result<std::string> LocalFileSystem::NormalizePath(std::string path) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ return fn.ToString();
+}
+
+bool LocalFileSystem::Equals(const FileSystem& other) const {
+ if (other.type_name() != type_name()) {
+ return false;
+ } else {
+ const auto& localfs = ::arrow::internal::checked_cast<const LocalFileSystem&>(other);
+ return options_.Equals(localfs.options());
+ }
+}
+
+Result<FileInfo> LocalFileSystem::GetFileInfo(const std::string& path) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ return StatFile(fn.ToNative());
+}
+
+Result<std::vector<FileInfo>> LocalFileSystem::GetFileInfo(const FileSelector& select) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(select.base_dir));
+ std::vector<FileInfo> results;
+ RETURN_NOT_OK(StatSelector(fn, select, 0, &results));
+ return results;
+}
+
+Status LocalFileSystem::CreateDir(const std::string& path, bool recursive) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ if (recursive) {
+ return ::arrow::internal::CreateDirTree(fn).status();
+ } else {
+ return ::arrow::internal::CreateDir(fn).status();
+ }
+}
+
+Status LocalFileSystem::DeleteDir(const std::string& path) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ auto st = ::arrow::internal::DeleteDirTree(fn, /*allow_not_found=*/false).status();
+ if (!st.ok()) {
+ // TODO Status::WithPrefix()?
+ std::stringstream ss;
+ ss << "Cannot delete directory '" << path << "': " << st.message();
+ return st.WithMessage(ss.str());
+ }
+ return Status::OK();
+}
+
+Status LocalFileSystem::DeleteDirContents(const std::string& path) {
+ if (internal::IsEmptyPath(path)) {
+ return internal::InvalidDeleteDirContents(path);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ auto st = ::arrow::internal::DeleteDirContents(fn, /*allow_not_found=*/false).status();
+ if (!st.ok()) {
+ std::stringstream ss;
+ ss << "Cannot delete directory contents in '" << path << "': " << st.message();
+ return st.WithMessage(ss.str());
+ }
+ return Status::OK();
+}
+
+Status LocalFileSystem::DeleteRootDirContents() {
+ return Status::Invalid("LocalFileSystem::DeleteRootDirContents is strictly forbidden");
+}
+
+Status LocalFileSystem::DeleteFile(const std::string& path) {
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ return ::arrow::internal::DeleteFile(fn, /*allow_not_found=*/false).status();
+}
+
+Status LocalFileSystem::Move(const std::string& src, const std::string& dest) {
+ ARROW_ASSIGN_OR_RAISE(auto sfn, PlatformFilename::FromString(src));
+ ARROW_ASSIGN_OR_RAISE(auto dfn, PlatformFilename::FromString(dest));
+
+#ifdef _WIN32
+ if (!MoveFileExW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+ MOVEFILE_REPLACE_EXISTING)) {
+ return IOErrorFromWinError(GetLastError(), "Failed renaming '", sfn.ToString(),
+ "' to '", dfn.ToString(), "'");
+ }
+#else
+ if (rename(sfn.ToNative().c_str(), dfn.ToNative().c_str()) == -1) {
+ return IOErrorFromErrno(errno, "Failed renaming '", sfn.ToString(), "' to '",
+ dfn.ToString(), "'");
+ }
+#endif
+ return Status::OK();
+}
+
+Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ ARROW_ASSIGN_OR_RAISE(auto sfn, PlatformFilename::FromString(src));
+ ARROW_ASSIGN_OR_RAISE(auto dfn, PlatformFilename::FromString(dest));
+ // XXX should we use fstat() to compare inodes?
+ if (sfn.ToNative() == dfn.ToNative()) {
+ return Status::OK();
+ }
+
+#ifdef _WIN32
+ if (!CopyFileW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+ FALSE /* bFailIfExists */)) {
+ return IOErrorFromWinError(GetLastError(), "Failed copying '", sfn.ToString(),
+ "' to '", dfn.ToString(), "'");
+ }
+ return Status::OK();
+#else
+ ARROW_ASSIGN_OR_RAISE(auto is, OpenInputStream(src));
+ ARROW_ASSIGN_OR_RAISE(auto os, OpenOutputStream(dest));
+ RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */, io_context()));
+ RETURN_NOT_OK(os->Close());
+ return is->Close();
+#endif
+}
+
+namespace {
+
+template <typename InputStreamType>
+Result<std::shared_ptr<InputStreamType>> OpenInputStreamGeneric(
+ const std::string& path, const LocalFileSystemOptions& options,
+ const io::IOContext& io_context) {
+ if (options.use_mmap) {
+ return io::MemoryMappedFile::Open(path, io::FileMode::READ);
+ } else {
+ return io::ReadableFile::Open(path, io_context.pool());
+ }
+}
+
+} // namespace
+
+Result<std::shared_ptr<io::InputStream>> LocalFileSystem::OpenInputStream(
+ const std::string& path) {
+ return OpenInputStreamGeneric<io::InputStream>(path, options_, io_context());
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> LocalFileSystem::OpenInputFile(
+ const std::string& path) {
+ return OpenInputStreamGeneric<io::RandomAccessFile>(path, options_, io_context());
+}
+
+namespace {
+
+Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(const std::string& path,
+ bool truncate,
+ bool append) {
+ int fd;
+ bool write_only = true;
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ ARROW_ASSIGN_OR_RAISE(
+ fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append));
+ auto maybe_stream = io::FileOutputStream::Open(fd);
+ if (!maybe_stream.ok()) {
+ ARROW_UNUSED(::arrow::internal::FileClose(fd));
+ }
+ return maybe_stream;
+}
+
+} // namespace
+
+Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenOutputStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ bool truncate = true;
+ bool append = false;
+ return OpenOutputStreamGeneric(path, truncate, append);
+}
+
+Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ bool truncate = false;
+ bool append = true;
+ return OpenOutputStreamGeneric(path, truncate, append);
+}
+
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h
new file mode 100644
index 00000000000..f8e77aee591
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/filesystem/filesystem.h"
+
+namespace arrow {
+namespace internal {
+
+class Uri;
+
+}
+
+namespace fs {
+
+/// Options for the LocalFileSystem implementation.
+struct ARROW_EXPORT LocalFileSystemOptions {
+ /// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
+ /// or a regular one.
+ bool use_mmap = false;
+
+ /// \brief Initialize with defaults
+ static LocalFileSystemOptions Defaults();
+
+ bool Equals(const LocalFileSystemOptions& other) const;
+
+ static Result<LocalFileSystemOptions> FromUri(const ::arrow::internal::Uri& uri,
+ std::string* out_path);
+};
+
+/// \brief A FileSystem implementation accessing files on the local machine.
+///
+/// This class handles only `/`-separated paths. If desired, conversion
+/// from Windows backslash-separated paths should be done by the caller.
+/// Details such as symlinks are abstracted away (symlinks are always
+/// followed, except when deleting an entry).
+class ARROW_EXPORT LocalFileSystem : public FileSystem {
+ public:
+ explicit LocalFileSystem(const io::IOContext& = io::default_io_context());
+ explicit LocalFileSystem(const LocalFileSystemOptions&,
+ const io::IOContext& = io::default_io_context());
+ ~LocalFileSystem() override;
+
+ std::string type_name() const override { return "local"; }
+
+ Result<std::string> NormalizePath(std::string path) override;
+
+ bool Equals(const FileSystem& other) const override;
+
+ LocalFileSystemOptions options() const { return options_; }
+
+ /// \cond FALSE
+ using FileSystem::GetFileInfo;
+ /// \endcond
+ Result<FileInfo> GetFileInfo(const std::string& path) override;
+ Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+ Status DeleteDirContents(const std::string& path) override;
+ Status DeleteRootDirContents() override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+
+ protected:
+ LocalFileSystemOptions options_;
+};
+
+namespace internal {
+
+// Return whether the string is detected as a local absolute path.
+ARROW_EXPORT
+bool DetectAbsolutePath(const std::string& s);
+
+} // namespace internal
+
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc
new file mode 100644
index 00000000000..14a38283b26
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <iterator>
+#include <map>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/buffer_builder.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/variant.h"
+#include "arrow/util/windows_fixup.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+namespace {
+
+////////////////////////////////////////////////////////////////////////////
+// Filesystem structure
+
+class Entry;
+
+struct File {
+ TimePoint mtime;
+ std::string name;
+ std::shared_ptr<Buffer> data;
+ std::shared_ptr<const KeyValueMetadata> metadata;
+
+ File(TimePoint mtime, std::string name) : mtime(mtime), name(std::move(name)) {}
+
+ int64_t size() const { return data ? data->size() : 0; }
+
+ explicit operator util::string_view() const {
+ if (data) {
+ return util::string_view(*data);
+ } else {
+ return "";
+ }
+ }
+};
+
+struct Directory {
+ std::string name;
+ TimePoint mtime;
+ std::map<std::string, std::unique_ptr<Entry>> entries;
+
+ Directory(std::string name, TimePoint mtime) : name(std::move(name)), mtime(mtime) {}
+ Directory(Directory&& other) noexcept
+ : name(std::move(other.name)),
+ mtime(other.mtime),
+ entries(std::move(other.entries)) {}
+
+ Directory& operator=(Directory&& other) noexcept {
+ name = std::move(other.name);
+ mtime = other.mtime;
+ entries = std::move(other.entries);
+ return *this;
+ }
+
+ Entry* Find(const std::string& s) {
+ auto it = entries.find(s);
+ if (it != entries.end()) {
+ return it->second.get();
+ } else {
+ return nullptr;
+ }
+ }
+
+ bool CreateEntry(const std::string& s, std::unique_ptr<Entry> entry) {
+ DCHECK(!s.empty());
+ auto p = entries.emplace(s, std::move(entry));
+ return p.second;
+ }
+
+ void AssignEntry(const std::string& s, std::unique_ptr<Entry> entry) {
+ DCHECK(!s.empty());
+ entries[s] = std::move(entry);
+ }
+
+ bool DeleteEntry(const std::string& s) { return entries.erase(s) > 0; }
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Directory);
+};
+
+// A filesystem entry
+using EntryBase = util::Variant<std::nullptr_t, File, Directory>;
+
+class Entry : public EntryBase {
+ public:
+ Entry(Entry&&) = default;
+ Entry& operator=(Entry&&) = default;
+ explicit Entry(Directory&& v) : EntryBase(std::move(v)) {}
+ explicit Entry(File&& v) : EntryBase(std::move(v)) {}
+
+ bool is_dir() const { return util::holds_alternative<Directory>(*this); }
+
+ bool is_file() const { return util::holds_alternative<File>(*this); }
+
+ Directory& as_dir() { return util::get<Directory>(*this); }
+
+ File& as_file() { return util::get<File>(*this); }
+
+ // Get info for this entry. Note the path() property isn't set.
+ FileInfo GetInfo() {
+ FileInfo info;
+ if (is_dir()) {
+ Directory& dir = as_dir();
+ info.set_type(FileType::Directory);
+ info.set_mtime(dir.mtime);
+ } else {
+ DCHECK(is_file());
+ File& file = as_file();
+ info.set_type(FileType::File);
+ info.set_mtime(file.mtime);
+ info.set_size(file.size());
+ }
+ return info;
+ }
+
+ // Get info for this entry, knowing the parent path.
+ FileInfo GetInfo(const std::string& base_path) {
+ FileInfo info;
+ if (is_dir()) {
+ Directory& dir = as_dir();
+ info.set_type(FileType::Directory);
+ info.set_mtime(dir.mtime);
+ info.set_path(ConcatAbstractPath(base_path, dir.name));
+ } else {
+ DCHECK(is_file());
+ File& file = as_file();
+ info.set_type(FileType::File);
+ info.set_mtime(file.mtime);
+ info.set_size(file.size());
+ info.set_path(ConcatAbstractPath(base_path, file.name));
+ }
+ return info;
+ }
+
+ // Set the entry name
+ void SetName(const std::string& name) {
+ if (is_dir()) {
+ as_dir().name = name;
+ } else {
+ DCHECK(is_file());
+ as_file().name = name;
+ }
+ }
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Entry);
+};
+
+////////////////////////////////////////////////////////////////////////////
+// Streams
+
+class MockFSOutputStream : public io::OutputStream {
+ public:
+ MockFSOutputStream(File* file, MemoryPool* pool)
+ : file_(file), builder_(pool), closed_(false) {}
+
+ ~MockFSOutputStream() override = default;
+
+ // Implement the OutputStream interface
+ Status Close() override {
+ if (!closed_) {
+ RETURN_NOT_OK(builder_.Finish(&file_->data));
+ closed_ = true;
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (!closed_) {
+ // MockFSOutputStream is mainly used for debugging and testing, so
+ // mark an aborted file's contents explicitly.
+ std::stringstream ss;
+ ss << "MockFSOutputStream aborted after " << file_->size() << " bytes written";
+ file_->data = Buffer::FromString(ss.str());
+ closed_ = true;
+ }
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Result<int64_t> Tell() const override {
+ if (closed_) {
+ return Status::Invalid("Invalid operation on closed stream");
+ }
+ return builder_.length();
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ if (closed_) {
+ return Status::Invalid("Invalid operation on closed stream");
+ }
+ return builder_.Append(data, nbytes);
+ }
+
+ protected:
+ File* file_;
+ BufferBuilder builder_;
+ bool closed_;
+};
+
+class MockFSInputStream : public io::BufferReader {
+ public:
+ explicit MockFSInputStream(const File& file)
+ : io::BufferReader(file.data), metadata_(file.metadata) {}
+
+ Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+ return metadata_;
+ }
+
+ protected:
+ std::shared_ptr<const KeyValueMetadata> metadata_;
+};
+
+} // namespace
+
+std::ostream& operator<<(std::ostream& os, const MockDirInfo& di) {
+ return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count()
+ << "]";
+}
+
+std::ostream& operator<<(std::ostream& os, const MockFileInfo& di) {
+ return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count()
+ << ", size=" << di.data.length() << "]";
+}
+
+////////////////////////////////////////////////////////////////////////////
+// MockFileSystem implementation
+
+class MockFileSystem::Impl {
+ public:
+ TimePoint current_time;
+ MemoryPool* pool;
+
+ // The root directory
+ Entry root;
+ std::mutex mutex;
+
+ Impl(TimePoint current_time, MemoryPool* pool)
+ : current_time(current_time), pool(pool), root(Directory("", current_time)) {}
+
+ std::unique_lock<std::mutex> lock_guard() {
+ return std::unique_lock<std::mutex>(mutex);
+ }
+
+ Directory& RootDir() { return root.as_dir(); }
+
+ template <typename It>
+ Entry* FindEntry(It it, It end, size_t* nconsumed) {
+ size_t consumed = 0;
+ Entry* entry = &root;
+
+ for (; it != end; ++it) {
+ const std::string& part = *it;
+ DCHECK(entry->is_dir());
+ Entry* child = entry->as_dir().Find(part);
+ if (child == nullptr) {
+ // Partial find only
+ break;
+ }
+ ++consumed;
+ entry = child;
+ if (entry->is_file()) {
+ // Cannot go any further
+ break;
+ }
+ // Recurse
+ }
+ *nconsumed = consumed;
+ return entry;
+ }
+
+ // Find an entry, allowing partial matching
+ Entry* FindEntry(const std::vector<std::string>& parts, size_t* nconsumed) {
+ return FindEntry(parts.begin(), parts.end(), nconsumed);
+ }
+
+ // Find an entry, only full matching allowed
+ Entry* FindEntry(const std::vector<std::string>& parts) {
+ size_t consumed;
+ auto entry = FindEntry(parts, &consumed);
+ return (consumed == parts.size()) ? entry : nullptr;
+ }
+
+ // Find the parent entry, only full matching allowed
+ Entry* FindParent(const std::vector<std::string>& parts) {
+ if (parts.size() == 0) {
+ return nullptr;
+ }
+ size_t consumed;
+ auto last = parts.end();
+ last--;
+ auto entry = FindEntry(parts.begin(), last, &consumed);
+ return (consumed == parts.size() - 1) ? entry : nullptr;
+ }
+
+ void GatherInfos(const FileSelector& select, const std::string& base_path,
+ const Directory& base_dir, int32_t nesting_depth,
+ std::vector<FileInfo>* infos) {
+ for (const auto& pair : base_dir.entries) {
+ Entry* child = pair.second.get();
+ infos->push_back(child->GetInfo(base_path));
+ if (select.recursive && nesting_depth < select.max_recursion && child->is_dir()) {
+ Directory& child_dir = child->as_dir();
+ std::string child_path = infos->back().path();
+ GatherInfos(select, std::move(child_path), child_dir, nesting_depth + 1, infos);
+ }
+ }
+ }
+
+ void DumpDirs(const std::string& prefix, const Directory& dir,
+ std::vector<MockDirInfo>* out) {
+ std::string path = prefix + dir.name;
+ if (!path.empty()) {
+ out->push_back({path, dir.mtime});
+ path += "/";
+ }
+ for (const auto& pair : dir.entries) {
+ Entry* child = pair.second.get();
+ if (child->is_dir()) {
+ DumpDirs(path, child->as_dir(), out);
+ }
+ }
+ }
+
+ void DumpFiles(const std::string& prefix, const Directory& dir,
+ std::vector<MockFileInfo>* out) {
+ std::string path = prefix + dir.name;
+ if (!path.empty()) {
+ path += "/";
+ }
+ for (const auto& pair : dir.entries) {
+ Entry* child = pair.second.get();
+ if (child->is_file()) {
+ auto& file = child->as_file();
+ out->push_back({path + file.name, file.mtime, util::string_view(file)});
+ } else if (child->is_dir()) {
+ DumpFiles(path, child->as_dir(), out);
+ }
+ }
+ }
+
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path, bool append,
+ const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ Entry* parent = FindParent(parts);
+ if (parent == nullptr || !parent->is_dir()) {
+ return PathNotFound(path);
+ }
+ // Find the file in the parent dir, or create it
+ const auto& name = parts.back();
+ Entry* child = parent->as_dir().Find(name);
+ File* file;
+ if (child == nullptr) {
+ child = new Entry(File(current_time, name));
+ parent->as_dir().AssignEntry(name, std::unique_ptr<Entry>(child));
+ file = &child->as_file();
+ } else if (child->is_file()) {
+ file = &child->as_file();
+ file->mtime = current_time;
+ } else {
+ return NotAFile(path);
+ }
+ file->metadata = metadata;
+ auto ptr = std::make_shared<MockFSOutputStream>(file, pool);
+ if (append && file->data) {
+ RETURN_NOT_OK(ptr->Write(file->data->data(), file->data->size()));
+ }
+ return ptr;
+ }
+
+ Result<std::shared_ptr<io::BufferReader>> OpenInputReader(const std::string& path) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ Entry* entry = FindEntry(parts);
+ if (entry == nullptr) {
+ return PathNotFound(path);
+ }
+ if (!entry->is_file()) {
+ return NotAFile(path);
+ }
+ return std::make_shared<MockFSInputStream>(entry->as_file());
+ }
+};
+
+MockFileSystem::~MockFileSystem() = default;
+
+MockFileSystem::MockFileSystem(TimePoint current_time, const io::IOContext& io_context) {
+ impl_ = std::unique_ptr<Impl>(new Impl(current_time, io_context.pool()));
+}
+
+bool MockFileSystem::Equals(const FileSystem& other) const { return this == &other; }
+
+Status MockFileSystem::CreateDir(const std::string& path, bool recursive) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ size_t consumed;
+ Entry* entry = impl_->FindEntry(parts, &consumed);
+ if (!entry->is_dir()) {
+ auto file_path = JoinAbstractPath(parts.begin(), parts.begin() + consumed);
+ return Status::IOError("Cannot create directory '", path, "': ", "ancestor '",
+ file_path, "' is not a directory");
+ }
+ if (!recursive && (parts.size() - consumed) > 1) {
+ return Status::IOError("Cannot create directory '", path,
+ "': ", "parent does not exist");
+ }
+ for (size_t i = consumed; i < parts.size(); ++i) {
+ const auto& name = parts[i];
+ std::unique_ptr<Entry> child(new Entry(Directory(name, impl_->current_time)));
+ Entry* child_ptr = child.get();
+ bool inserted = entry->as_dir().CreateEntry(name, std::move(child));
+ // No race condition on insertion is possible, as all operations are locked
+ DCHECK(inserted);
+ entry = child_ptr;
+ }
+ return Status::OK();
+}
+
+Status MockFileSystem::DeleteDir(const std::string& path) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ Entry* parent = impl_->FindParent(parts);
+ if (parent == nullptr || !parent->is_dir()) {
+ return PathNotFound(path);
+ }
+ Directory& parent_dir = parent->as_dir();
+ auto child = parent_dir.Find(parts.back());
+ if (child == nullptr) {
+ return PathNotFound(path);
+ }
+ if (!child->is_dir()) {
+ return NotADir(path);
+ }
+
+ bool deleted = parent_dir.DeleteEntry(parts.back());
+ DCHECK(deleted);
+ return Status::OK();
+}
+
+Status MockFileSystem::DeleteDirContents(const std::string& path) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ if (parts.empty()) {
+ // Wipe filesystem
+ return internal::InvalidDeleteDirContents(path);
+ }
+
+ Entry* entry = impl_->FindEntry(parts);
+ if (entry == nullptr) {
+ return PathNotFound(path);
+ }
+ if (!entry->is_dir()) {
+ return NotADir(path);
+ }
+ entry->as_dir().entries.clear();
+ return Status::OK();
+}
+
+Status MockFileSystem::DeleteRootDirContents() {
+ auto guard = impl_->lock_guard();
+
+ impl_->RootDir().entries.clear();
+ return Status::OK();
+}
+
+Status MockFileSystem::DeleteFile(const std::string& path) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ Entry* parent = impl_->FindParent(parts);
+ if (parent == nullptr || !parent->is_dir()) {
+ return PathNotFound(path);
+ }
+ Directory& parent_dir = parent->as_dir();
+ auto child = parent_dir.Find(parts.back());
+ if (child == nullptr) {
+ return PathNotFound(path);
+ }
+ if (!child->is_file()) {
+ return NotAFile(path);
+ }
+ bool deleted = parent_dir.DeleteEntry(parts.back());
+ DCHECK(deleted);
+ return Status::OK();
+}
+
+Result<FileInfo> MockFileSystem::GetFileInfo(const std::string& path) {
+ auto parts = SplitAbstractPath(path);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ FileInfo info;
+ Entry* entry = impl_->FindEntry(parts);
+ if (entry == nullptr) {
+ info.set_type(FileType::NotFound);
+ } else {
+ info = entry->GetInfo();
+ }
+ info.set_path(path);
+ return info;
+}
+
+Result<FileInfoVector> MockFileSystem::GetFileInfo(const FileSelector& selector) {
+ auto parts = SplitAbstractPath(selector.base_dir);
+ RETURN_NOT_OK(ValidateAbstractPathParts(parts));
+
+ auto guard = impl_->lock_guard();
+
+ FileInfoVector results;
+
+ Entry* base_dir = impl_->FindEntry(parts);
+ if (base_dir == nullptr) {
+ // Base directory does not exist
+ if (selector.allow_not_found) {
+ return results;
+ } else {
+ return PathNotFound(selector.base_dir);
+ }
+ }
+ if (!base_dir->is_dir()) {
+ return NotADir(selector.base_dir);
+ }
+
+ impl_->GatherInfos(selector, selector.base_dir, base_dir->as_dir(), 0, &results);
+ return results;
+}
+
+namespace {
+
+// Helper for binary operations (move, copy)
+struct BinaryOp {
+ std::vector<std::string> src_parts;
+ std::vector<std::string> dest_parts;
+ Directory& src_dir;
+ Directory& dest_dir;
+ std::string src_name;
+ std::string dest_name;
+ Entry* src_entry;
+ Entry* dest_entry;
+
+ template <typename OpFunc>
+ static Status Run(MockFileSystem::Impl* impl, const std::string& src,
+ const std::string& dest, OpFunc&& op_func) {
+ auto src_parts = SplitAbstractPath(src);
+ auto dest_parts = SplitAbstractPath(dest);
+ RETURN_NOT_OK(ValidateAbstractPathParts(src_parts));
+ RETURN_NOT_OK(ValidateAbstractPathParts(dest_parts));
+
+ auto guard = impl->lock_guard();
+
+ // Both source and destination must have valid parents
+ Entry* src_parent = impl->FindParent(src_parts);
+ if (src_parent == nullptr || !src_parent->is_dir()) {
+ return PathNotFound(src);
+ }
+ Entry* dest_parent = impl->FindParent(dest_parts);
+ if (dest_parent == nullptr || !dest_parent->is_dir()) {
+ return PathNotFound(dest);
+ }
+ Directory& src_dir = src_parent->as_dir();
+ Directory& dest_dir = dest_parent->as_dir();
+ DCHECK_GE(src_parts.size(), 1);
+ DCHECK_GE(dest_parts.size(), 1);
+ const auto& src_name = src_parts.back();
+ const auto& dest_name = dest_parts.back();
+
+ BinaryOp op{std::move(src_parts),
+ std::move(dest_parts),
+ src_dir,
+ dest_dir,
+ src_name,
+ dest_name,
+ src_dir.Find(src_name),
+ dest_dir.Find(dest_name)};
+
+ return op_func(std::move(op));
+ }
+};
+
+} // namespace
+
+Status MockFileSystem::Move(const std::string& src, const std::string& dest) {
+ return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status {
+ if (op.src_entry == nullptr) {
+ return PathNotFound(src);
+ }
+ if (op.dest_entry != nullptr) {
+ if (op.dest_entry->is_dir()) {
+ return Status::IOError("Cannot replace destination '", dest,
+ "', which is a directory");
+ }
+ if (op.dest_entry->is_file() && op.src_entry->is_dir()) {
+ return Status::IOError("Cannot replace destination '", dest,
+ "', which is a file, with directory '", src, "'");
+ }
+ }
+ if (op.src_parts.size() < op.dest_parts.size()) {
+ // Check if dest is a child of src
+ auto p =
+ std::mismatch(op.src_parts.begin(), op.src_parts.end(), op.dest_parts.begin());
+ if (p.first == op.src_parts.end()) {
+ return Status::IOError("Cannot move '", src, "' into child path '", dest, "'");
+ }
+ }
+
+ // Move original entry, fix its name
+ std::unique_ptr<Entry> new_entry(new Entry(std::move(*op.src_entry)));
+ new_entry->SetName(op.dest_name);
+ bool deleted = op.src_dir.DeleteEntry(op.src_name);
+ DCHECK(deleted);
+ op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry));
+ return Status::OK();
+ });
+}
+
+Status MockFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status {
+ if (op.src_entry == nullptr) {
+ return PathNotFound(src);
+ }
+ if (!op.src_entry->is_file()) {
+ return NotAFile(src);
+ }
+ if (op.dest_entry != nullptr && op.dest_entry->is_dir()) {
+ return Status::IOError("Cannot replace destination '", dest,
+ "', which is a directory");
+ }
+
+ // Copy original entry, fix its name
+ std::unique_ptr<Entry> new_entry(new Entry(File(op.src_entry->as_file())));
+ new_entry->SetName(op.dest_name);
+ op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry));
+ return Status::OK();
+ });
+}
+
+Result<std::shared_ptr<io::InputStream>> MockFileSystem::OpenInputStream(
+ const std::string& path) {
+ auto guard = impl_->lock_guard();
+
+ return impl_->OpenInputReader(path);
+}
+
+Result<std::shared_ptr<io::RandomAccessFile>> MockFileSystem::OpenInputFile(
+ const std::string& path) {
+ auto guard = impl_->lock_guard();
+
+ return impl_->OpenInputReader(path);
+}
+
+Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenOutputStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ auto guard = impl_->lock_guard();
+
+ return impl_->OpenOutputStream(path, /*append=*/false, metadata);
+}
+
+Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenAppendStream(
+ const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
+ auto guard = impl_->lock_guard();
+
+ return impl_->OpenOutputStream(path, /*append=*/true, metadata);
+}
+
+std::vector<MockDirInfo> MockFileSystem::AllDirs() {
+ auto guard = impl_->lock_guard();
+
+ std::vector<MockDirInfo> result;
+ impl_->DumpDirs("", impl_->RootDir(), &result);
+ return result;
+}
+
+std::vector<MockFileInfo> MockFileSystem::AllFiles() {
+ auto guard = impl_->lock_guard();
+
+ std::vector<MockFileInfo> result;
+ impl_->DumpFiles("", impl_->RootDir(), &result);
+ return result;
+}
+
+Status MockFileSystem::CreateFile(const std::string& path, util::string_view contents,
+ bool recursive) {
+ auto parent = fs::internal::GetAbstractPathParent(path).first;
+
+ if (parent != "") {
+ RETURN_NOT_OK(CreateDir(parent, recursive));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto file, OpenOutputStream(path));
+ RETURN_NOT_OK(file->Write(contents));
+ return file->Close();
+}
+
+Result<std::shared_ptr<FileSystem>> MockFileSystem::Make(
+ TimePoint current_time, const std::vector<FileInfo>& infos) {
+ auto fs = std::make_shared<MockFileSystem>(current_time);
+ for (const auto& info : infos) {
+ switch (info.type()) {
+ case FileType::Directory:
+ RETURN_NOT_OK(fs->CreateDir(info.path(), /*recursive*/ true));
+ break;
+ case FileType::File:
+ RETURN_NOT_OK(fs->CreateFile(info.path(), "", /*recursive*/ true));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return fs;
+}
+
+FileInfoGenerator MockAsyncFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto maybe_infos = GetFileInfo(select);
+ if (maybe_infos.ok()) {
+ // Return the FileInfo entries one by one
+ const auto& infos = *maybe_infos;
+ std::vector<FileInfoVector> chunks(infos.size());
+ std::transform(infos.begin(), infos.end(), chunks.begin(),
+ [](const FileInfo& info) { return FileInfoVector{info}; });
+ return MakeVectorGenerator(std::move(chunks));
+ } else {
+ return MakeFailingGenerator(maybe_infos);
+ }
+}
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h
new file mode 100644
index 00000000000..378f30d295d
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/windows_fixup.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+struct MockDirInfo {
+ std::string full_path;
+ TimePoint mtime;
+
+ bool operator==(const MockDirInfo& other) const {
+ return mtime == other.mtime && full_path == other.full_path;
+ }
+
+ friend ARROW_EXPORT std::ostream& operator<<(std::ostream&, const MockDirInfo&);
+};
+
+struct MockFileInfo {
+ std::string full_path;
+ TimePoint mtime;
+ util::string_view data;
+
+ bool operator==(const MockFileInfo& other) const {
+ return mtime == other.mtime && full_path == other.full_path && data == other.data;
+ }
+
+ friend ARROW_EXPORT std::ostream& operator<<(std::ostream&, const MockFileInfo&);
+};
+
+/// A mock FileSystem implementation that holds its contents in memory.
+///
+/// Useful for validating the FileSystem API, writing conformance suite,
+/// and bootstrapping FileSystem-based APIs.
+class ARROW_EXPORT MockFileSystem : public FileSystem {
+ public:
+ explicit MockFileSystem(TimePoint current_time,
+ const io::IOContext& = io::default_io_context());
+ ~MockFileSystem() override;
+
+ std::string type_name() const override { return "mock"; }
+
+ bool Equals(const FileSystem& other) const override;
+
+ // XXX It's not very practical to have to explicitly declare inheritance
+ // of default overrides.
+ using FileSystem::GetFileInfo;
+ Result<FileInfo> GetFileInfo(const std::string& path) override;
+ Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+ Status DeleteDirContents(const std::string& path) override;
+ Status DeleteRootDirContents() override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Result<std::shared_ptr<io::InputStream>> OpenInputStream(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
+ const std::string& path) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+ Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
+ const std::string& path,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
+
+ // Contents-dumping helpers to ease testing.
+ // Output is lexicographically-ordered by full path.
+ std::vector<MockDirInfo> AllDirs();
+ std::vector<MockFileInfo> AllFiles();
+
+ // Create a File with a content from a string.
+ Status CreateFile(const std::string& path, util::string_view content,
+ bool recursive = true);
+
+ // Create a MockFileSystem out of (empty) FileInfo. The content of every
+ // file is empty and of size 0. All directories will be created recursively.
+ static Result<std::shared_ptr<FileSystem>> Make(TimePoint current_time,
+ const std::vector<FileInfo>& infos);
+
+ class Impl;
+
+ protected:
+ std::unique_ptr<Impl> impl_;
+};
+
+class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem {
+ public:
+ explicit MockAsyncFileSystem(TimePoint current_time,
+ const io::IOContext& io_context = io::default_io_context())
+ : MockFileSystem(current_time, io_context) {
+ default_async_is_sync_ = false;
+ }
+
+ FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+};
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc
new file mode 100644
index 00000000000..f1bd5c087bf
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+
+#include "arrow/filesystem/path_util.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+// XXX How does this encode Windows UNC paths?
+
+std::vector<std::string> SplitAbstractPath(const std::string& path) {
+ std::vector<std::string> parts;
+ auto v = util::string_view(path);
+ // Strip trailing slash
+ if (v.length() > 0 && v.back() == kSep) {
+ v = v.substr(0, v.length() - 1);
+ }
+ // Strip leading slash
+ if (v.length() > 0 && v.front() == kSep) {
+ v = v.substr(1);
+ }
+ if (v.length() == 0) {
+ return parts;
+ }
+
+ auto append_part = [&parts, &v](size_t start, size_t end) {
+ parts.push_back(std::string(v.substr(start, end - start)));
+ };
+
+ size_t start = 0;
+ while (true) {
+ size_t end = v.find_first_of(kSep, start);
+ append_part(start, end);
+ if (end == std::string::npos) {
+ break;
+ }
+ start = end + 1;
+ }
+ return parts;
+}
+
+std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
+ // XXX should strip trailing slash?
+
+ auto pos = s.find_last_of(kSep);
+ if (pos == std::string::npos) {
+ // Empty parent
+ return {{}, s};
+ }
+ return {s.substr(0, pos), s.substr(pos + 1)};
+}
+
+std::string GetAbstractPathExtension(const std::string& s) {
+ util::string_view basename(s);
+ auto offset = basename.find_last_of(kSep);
+ if (offset != std::string::npos) {
+ basename = basename.substr(offset);
+ }
+ auto dot = basename.find_last_of('.');
+ if (dot == util::string_view::npos) {
+ // Empty extension
+ return "";
+ }
+ return std::string(basename.substr(dot + 1));
+}
+
+Status ValidateAbstractPathParts(const std::vector<std::string>& parts) {
+ for (const auto& part : parts) {
+ if (part.length() == 0) {
+ return Status::Invalid("Empty path component");
+ }
+ if (part.find_first_of(kSep) != std::string::npos) {
+ return Status::Invalid("Separator in component '", part, "'");
+ }
+ }
+ return Status::OK();
+}
+
+std::string ConcatAbstractPath(const std::string& base, const std::string& stem) {
+ DCHECK(!stem.empty());
+ if (base.empty()) {
+ return stem;
+ }
+ return EnsureTrailingSlash(base) + std::string(RemoveLeadingSlash(stem));
+}
+
+std::string EnsureTrailingSlash(util::string_view v) {
+ if (v.length() > 0 && v.back() != kSep) {
+ // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
+ // Unless the local filesystem always uses absolute paths
+ return std::string(v) + kSep;
+ } else {
+ return std::string(v);
+ }
+}
+
+std::string EnsureLeadingSlash(util::string_view v) {
+ if (v.length() == 0 || v.front() != kSep) {
+ // XXX How about "C:" on Windows? We probably don't want to turn it into "/C:"...
+ return kSep + std::string(v);
+ } else {
+ return std::string(v);
+ }
+}
+util::string_view RemoveTrailingSlash(util::string_view key) {
+ while (!key.empty() && key.back() == kSep) {
+ key.remove_suffix(1);
+ }
+ return key;
+}
+
+util::string_view RemoveLeadingSlash(util::string_view key) {
+ while (!key.empty() && key.front() == kSep) {
+ key.remove_prefix(1);
+ }
+ return key;
+}
+
+Result<std::string> MakeAbstractPathRelative(const std::string& base,
+ const std::string& path) {
+ if (base.empty() || base.front() != kSep) {
+ return Status::Invalid("MakeAbstractPathRelative called with non-absolute base '",
+ base, "'");
+ }
+ auto b = EnsureLeadingSlash(RemoveTrailingSlash(base));
+ auto p = util::string_view(path);
+ if (p.substr(0, b.size()) != util::string_view(b)) {
+ return Status::Invalid("Path '", path, "' is not relative to '", base, "'");
+ }
+ p = p.substr(b.size());
+ if (!p.empty() && p.front() != kSep && b.back() != kSep) {
+ return Status::Invalid("Path '", path, "' is not relative to '", base, "'");
+ }
+ return std::string(RemoveLeadingSlash(p));
+}
+
+bool IsAncestorOf(util::string_view ancestor, util::string_view descendant) {
+ ancestor = RemoveTrailingSlash(ancestor);
+ if (ancestor == "") {
+ // everything is a descendant of the root directory
+ return true;
+ }
+
+ descendant = RemoveTrailingSlash(descendant);
+ if (!descendant.starts_with(ancestor)) {
+ // an ancestor path is a prefix of descendant paths
+ return false;
+ }
+
+ descendant.remove_prefix(ancestor.size());
+
+ if (descendant.empty()) {
+ // "/hello" is an ancestor of "/hello"
+ return true;
+ }
+
+ // "/hello/w" is not an ancestor of "/hello/world"
+ return descendant.starts_with(std::string{kSep});
+}
+
+util::optional<util::string_view> RemoveAncestor(util::string_view ancestor,
+ util::string_view descendant) {
+ if (!IsAncestorOf(ancestor, descendant)) {
+ return util::nullopt;
+ }
+
+ auto relative_to_ancestor = descendant.substr(ancestor.size());
+ return RemoveLeadingSlash(relative_to_ancestor);
+}
+
+std::vector<std::string> AncestorsFromBasePath(util::string_view base_path,
+ util::string_view descendant) {
+ std::vector<std::string> ancestry;
+ if (auto relative = RemoveAncestor(base_path, descendant)) {
+ auto relative_segments = fs::internal::SplitAbstractPath(std::string(*relative));
+
+ // the last segment indicates descendant
+ relative_segments.pop_back();
+
+ if (relative_segments.empty()) {
+ // no missing parent
+ return {};
+ }
+
+ for (auto&& relative_segment : relative_segments) {
+ ancestry.push_back(JoinAbstractPath(
+ std::vector<std::string>{std::string(base_path), std::move(relative_segment)}));
+ base_path = ancestry.back();
+ }
+ }
+ return ancestry;
+}
+
+std::vector<std::string> MinimalCreateDirSet(std::vector<std::string> dirs) {
+ std::sort(dirs.begin(), dirs.end());
+
+ for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) {
+ auto descendant = ancestor;
+ auto descendants_end = descendant + 1;
+
+ while (descendants_end != dirs.end() && IsAncestorOf(*descendant, *descendants_end)) {
+ ++descendant;
+ ++descendants_end;
+ }
+
+ ancestor = dirs.erase(ancestor, descendants_end - 1);
+ }
+
+ // the root directory need not be created
+ if (dirs.size() == 1 && IsAncestorOf(dirs[0], "")) {
+ return {};
+ }
+
+ return dirs;
+}
+
+std::string ToBackslashes(util::string_view v) {
+ std::string s(v);
+ for (auto& c : s) {
+ if (c == '/') {
+ c = '\\';
+ }
+ }
+ return s;
+}
+
+std::string ToSlashes(util::string_view v) {
+ std::string s(v);
+#ifdef _WIN32
+ for (auto& c : s) {
+ if (c == '\\') {
+ c = '/';
+ }
+ }
+#endif
+ return s;
+}
+
+bool IsEmptyPath(util::string_view v) {
+ for (const auto c : v) {
+ if (c != '/') {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h
new file mode 100644
index 00000000000..5701c11b5d8
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+constexpr char kSep = '/';
+
+// Computations on abstract paths (not local paths with system-dependent behaviour).
+// Abstract paths are typically used in URIs.
+
+// Split an abstract path into its individual components.
+ARROW_EXPORT
+std::vector<std::string> SplitAbstractPath(const std::string& s);
+
+// Return the extension of the file
+ARROW_EXPORT
+std::string GetAbstractPathExtension(const std::string& s);
+
+// Return the parent directory and basename of an abstract path. Both values may be
+// empty.
+ARROW_EXPORT
+std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s);
+
+// Validate the components of an abstract path.
+ARROW_EXPORT
+Status ValidateAbstractPathParts(const std::vector<std::string>& parts);
+
+// Append a non-empty stem to an abstract path.
+ARROW_EXPORT
+std::string ConcatAbstractPath(const std::string& base, const std::string& stem);
+
+// Make path relative to base, if it starts with base. Otherwise error out.
+ARROW_EXPORT
+Result<std::string> MakeAbstractPathRelative(const std::string& base,
+ const std::string& path);
+
+ARROW_EXPORT
+std::string EnsureLeadingSlash(util::string_view s);
+
+ARROW_EXPORT
+util::string_view RemoveLeadingSlash(util::string_view s);
+
+ARROW_EXPORT
+std::string EnsureTrailingSlash(util::string_view s);
+
+ARROW_EXPORT
+util::string_view RemoveTrailingSlash(util::string_view s);
+
+ARROW_EXPORT
+bool IsAncestorOf(util::string_view ancestor, util::string_view descendant);
+
+ARROW_EXPORT
+util::optional<util::string_view> RemoveAncestor(util::string_view ancestor,
+ util::string_view descendant);
+
+/// Return a vector of ancestors between a base path and a descendant.
+/// For example,
+///
+/// AncestorsFromBasePath("a/b", "a/b/c/d/e") -> ["a/b/c", "a/b/c/d"]
+ARROW_EXPORT
+std::vector<std::string> AncestorsFromBasePath(util::string_view base_path,
+ util::string_view descendant);
+
+/// Given a vector of paths of directories which must be created, produce a the minimal
+/// subset for passing to CreateDir(recursive=true) by removing redundant parent
+/// directories
+ARROW_EXPORT
+std::vector<std::string> MinimalCreateDirSet(std::vector<std::string> dirs);
+
+// Join the components of an abstract path.
+template <class StringIt>
+std::string JoinAbstractPath(StringIt it, StringIt end) {
+ std::string path;
+ for (; it != end; ++it) {
+ if (it->empty()) continue;
+
+ if (!path.empty()) {
+ path += kSep;
+ }
+ path += *it;
+ }
+ return path;
+}
+
+template <class StringRange>
+std::string JoinAbstractPath(const StringRange& range) {
+ return JoinAbstractPath(range.begin(), range.end());
+}
+
+/// Convert slashes to backslashes, on all platforms. Mostly useful for testing.
+ARROW_EXPORT
+std::string ToBackslashes(util::string_view s);
+
+/// Ensure a local path is abstract, by converting backslashes to regular slashes
+/// on Windows. Return the path unchanged on other systems.
+ARROW_EXPORT
+std::string ToSlashes(util::string_view s);
+
+ARROW_EXPORT
+bool IsEmptyPath(util::string_view s);
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h
new file mode 100644
index 00000000000..112563577db
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace arrow {
+namespace fs {
+
+/// \brief FileSystem entry type
+enum class FileType : int8_t {
+ /// Entry is not found
+ NotFound,
+ /// Entry exists but its type is unknown
+ ///
+ /// This can designate a special file such as a Unix socket or character
+ /// device, or Windows NUL / CON / ...
+ Unknown,
+ /// Entry is a regular file
+ File,
+ /// Entry is a directory
+ Directory
+};
+
+struct FileInfo;
+
+struct FileSelector;
+
+class FileSystem;
+class SubTreeFileSystem;
+class SlowFileSystem;
+class LocalFileSystem;
+class S3FileSystem;
+
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc
new file mode 100644
index 00000000000..8f86707375d
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/buffer.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+TimePoint CurrentTimePoint() {
+ auto now = std::chrono::system_clock::now();
+ return TimePoint(
+ std::chrono::duration_cast<TimePoint::duration>(now.time_since_epoch()));
+}
+
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+ const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size,
+ const io::IOContext& io_context) {
+ ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size, io_context.pool()));
+
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ src->Read(chunk_size, chunk->mutable_data()));
+ if (bytes_read == 0) {
+ // EOF
+ break;
+ }
+ RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read));
+ }
+
+ return Status::OK();
+}
+
+Status PathNotFound(const std::string& path) {
+ return Status::IOError("Path does not exist '", path, "'");
+}
+
+Status NotADir(const std::string& path) {
+ return Status::IOError("Not a directory: '", path, "'");
+}
+
+Status NotAFile(const std::string& path) {
+ return Status::IOError("Not a regular file: '", path, "'");
+}
+
+Status InvalidDeleteDirContents(const std::string& path) {
+ return Status::Invalid(
+ "DeleteDirContents called on invalid path '", path, "'. ",
+ "If you wish to delete the root directory's contents, call DeleteRootDirContents.");
+}
+
+FileSystemGlobalOptions global_options;
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h
new file mode 100644
index 00000000000..915c8d03d46
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+ARROW_EXPORT
+TimePoint CurrentTimePoint();
+
+ARROW_EXPORT
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+ const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size,
+ const io::IOContext& io_context);
+
+ARROW_EXPORT
+Status PathNotFound(const std::string& path);
+
+ARROW_EXPORT
+Status NotADir(const std::string& path);
+
+ARROW_EXPORT
+Status NotAFile(const std::string& path);
+
+ARROW_EXPORT
+Status InvalidDeleteDirContents(const std::string& path);
+
+extern FileSystemGlobalOptions global_options;
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc b/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc
new file mode 100644
index 00000000000..117b82df30d
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc
@@ -0,0 +1,940 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <sstream>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_dict.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/array/builder_dict.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_time.h"
+#include "arrow/array/builder_union.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/value_parsing.h"
+
+#include "arrow/json/rapidjson_defs.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/reader.h>
+#include <rapidjson/writer.h>
+
+namespace rj = arrow::rapidjson;
+
+namespace arrow {
+
+using internal::ParseValue;
+
+namespace ipc {
+namespace internal {
+namespace json {
+
+using ::arrow::internal::checked_cast;
+using ::arrow::internal::checked_pointer_cast;
+
+namespace {
+
+constexpr auto kParseFlags = rj::kParseFullPrecisionFlag | rj::kParseNanAndInfFlag;
+
+Status JSONTypeError(const char* expected_type, rj::Type json_type) {
+ return Status::Invalid("Expected ", expected_type, " or null, got JSON type ",
+ json_type);
+}
+
+class Converter {
+ public:
+ virtual ~Converter() = default;
+
+ virtual Status Init() { return Status::OK(); }
+
+ virtual Status AppendValue(const rj::Value& json_obj) = 0;
+
+ Status AppendNull() { return this->builder()->AppendNull(); }
+
+ virtual Status AppendValues(const rj::Value& json_array) = 0;
+
+ virtual std::shared_ptr<ArrayBuilder> builder() = 0;
+
+ virtual Status Finish(std::shared_ptr<Array>* out) {
+ auto builder = this->builder();
+ if (builder->length() == 0) {
+ // Make sure the builder was initialized
+ RETURN_NOT_OK(builder->Resize(1));
+ }
+ return builder->Finish(out);
+ }
+
+ protected:
+ std::shared_ptr<DataType> type_;
+};
+
+Status GetConverter(const std::shared_ptr<DataType>&, std::shared_ptr<Converter>* out);
+
+// CRTP
+template <class Derived>
+class ConcreteConverter : public Converter {
+ public:
+ Status AppendValues(const rj::Value& json_array) override {
+ auto self = static_cast<Derived*>(this);
+ if (!json_array.IsArray()) {
+ return JSONTypeError("array", json_array.GetType());
+ }
+ auto size = json_array.Size();
+ for (uint32_t i = 0; i < size; ++i) {
+ RETURN_NOT_OK(self->AppendValue(json_array[i]));
+ }
+ return Status::OK();
+ }
+
+ const std::shared_ptr<DataType>& value_type() {
+ if (type_->id() != Type::DICTIONARY) {
+ return type_;
+ }
+ return checked_cast<const DictionaryType&>(*type_).value_type();
+ }
+
+ template <typename BuilderType>
+ Status MakeConcreteBuilder(std::shared_ptr<BuilderType>* out) {
+ std::unique_ptr<ArrayBuilder> builder;
+ RETURN_NOT_OK(MakeBuilder(default_memory_pool(), this->type_, &builder));
+ *out = checked_pointer_cast<BuilderType>(std::move(builder));
+ DCHECK(*out);
+ return Status::OK();
+ }
+};
+
+// ------------------------------------------------------------------------
+// Converter for null arrays
+
+class NullConverter final : public ConcreteConverter<NullConverter> {
+ public:
+ explicit NullConverter(const std::shared_ptr<DataType>& type) {
+ type_ = type;
+ builder_ = std::make_shared<NullBuilder>();
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return AppendNull();
+ }
+ return JSONTypeError("null", json_obj.GetType());
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<NullBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for boolean arrays
+
+class BooleanConverter final : public ConcreteConverter<BooleanConverter> {
+ public:
+ explicit BooleanConverter(const std::shared_ptr<DataType>& type) {
+ type_ = type;
+ builder_ = std::make_shared<BooleanBuilder>();
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return AppendNull();
+ }
+ if (json_obj.IsBool()) {
+ return builder_->Append(json_obj.GetBool());
+ }
+ if (json_obj.IsInt()) {
+ return builder_->Append(json_obj.GetInt() != 0);
+ }
+ return JSONTypeError("boolean", json_obj.GetType());
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BooleanBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Helpers for numeric converters
+
+// Convert single signed integer value (also {Date,Time}{32,64} and Timestamp)
+template <typename T>
+enable_if_physical_signed_integer<T, Status> ConvertNumber(const rj::Value& json_obj,
+ const DataType& type,
+ typename T::c_type* out) {
+ if (json_obj.IsInt64()) {
+ int64_t v64 = json_obj.GetInt64();
+ *out = static_cast<typename T::c_type>(v64);
+ if (*out == v64) {
+ return Status::OK();
+ } else {
+ return Status::Invalid("Value ", v64, " out of bounds for ", type);
+ }
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("signed int", json_obj.GetType());
+ }
+}
+
+// Convert single unsigned integer value
+template <typename T>
+enable_if_physical_unsigned_integer<T, Status> ConvertNumber(const rj::Value& json_obj,
+ const DataType& type,
+ typename T::c_type* out) {
+ if (json_obj.IsUint64()) {
+ uint64_t v64 = json_obj.GetUint64();
+ *out = static_cast<typename T::c_type>(v64);
+ if (*out == v64) {
+ return Status::OK();
+ } else {
+ return Status::Invalid("Value ", v64, " out of bounds for ", type);
+ }
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("unsigned int", json_obj.GetType());
+ }
+}
+
+// Convert single floating point value
+template <typename T>
+enable_if_physical_floating_point<T, Status> ConvertNumber(const rj::Value& json_obj,
+ const DataType& type,
+ typename T::c_type* out) {
+ if (json_obj.IsNumber()) {
+ *out = static_cast<typename T::c_type>(json_obj.GetDouble());
+ return Status::OK();
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("number", json_obj.GetType());
+ }
+}
+
+// ------------------------------------------------------------------------
+// Converter for int arrays
+
+template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType>
+class IntegerConverter final
+ : public ConcreteConverter<IntegerConverter<Type, BuilderType>> {
+ using c_type = typename Type::c_type;
+
+ static constexpr auto is_signed = std::is_signed<c_type>::value;
+
+ public:
+ explicit IntegerConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; }
+
+ Status Init() override { return this->MakeConcreteBuilder(&builder_); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ c_type value;
+ RETURN_NOT_OK(ConvertNumber<Type>(json_obj, *this->type_, &value));
+ return builder_->Append(value);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for float arrays
+
+template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType>
+class FloatConverter final : public ConcreteConverter<FloatConverter<Type, BuilderType>> {
+ using c_type = typename Type::c_type;
+
+ public:
+ explicit FloatConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; }
+
+ Status Init() override { return this->MakeConcreteBuilder(&builder_); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ c_type value;
+ RETURN_NOT_OK(ConvertNumber<Type>(json_obj, *this->type_, &value));
+ return builder_->Append(value);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for decimal arrays
+
+template <typename DecimalSubtype, typename DecimalValue, typename BuilderType>
+class DecimalConverter final
+ : public ConcreteConverter<
+ DecimalConverter<DecimalSubtype, DecimalValue, BuilderType>> {
+ public:
+ explicit DecimalConverter(const std::shared_ptr<DataType>& type) {
+ this->type_ = type;
+ decimal_type_ = &checked_cast<const DecimalSubtype&>(*this->value_type());
+ }
+
+ Status Init() override { return this->MakeConcreteBuilder(&builder_); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ if (json_obj.IsString()) {
+ int32_t precision, scale;
+ DecimalValue d;
+ auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength());
+ RETURN_NOT_OK(DecimalValue::FromString(view, &d, &precision, &scale));
+ if (scale != decimal_type_->scale()) {
+ return Status::Invalid("Invalid scale for decimal: expected ",
+ decimal_type_->scale(), ", got ", scale);
+ }
+ return builder_->Append(d);
+ }
+ return JSONTypeError("decimal string", json_obj.GetType());
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+ const DecimalSubtype* decimal_type_;
+};
+
+template <typename BuilderType = typename TypeTraits<Decimal128Type>::BuilderType>
+using Decimal128Converter = DecimalConverter<Decimal128Type, Decimal128, BuilderType>;
+template <typename BuilderType = typename TypeTraits<Decimal256Type>::BuilderType>
+using Decimal256Converter = DecimalConverter<Decimal256Type, Decimal256, BuilderType>;
+
+// ------------------------------------------------------------------------
+// Converter for timestamp arrays
+
+class TimestampConverter final : public ConcreteConverter<TimestampConverter> {
+ public:
+ explicit TimestampConverter(const std::shared_ptr<DataType>& type)
+ : timestamp_type_{checked_cast<const TimestampType*>(type.get())} {
+ this->type_ = type;
+ builder_ = std::make_shared<TimestampBuilder>(type, default_memory_pool());
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ int64_t value;
+ if (json_obj.IsNumber()) {
+ RETURN_NOT_OK(ConvertNumber<Int64Type>(json_obj, *this->type_, &value));
+ } else if (json_obj.IsString()) {
+ util::string_view view(json_obj.GetString(), json_obj.GetStringLength());
+ if (!ParseValue(*timestamp_type_, view.data(), view.size(), &value)) {
+ return Status::Invalid("couldn't parse timestamp from ", view);
+ }
+ } else {
+ return JSONTypeError("timestamp", json_obj.GetType());
+ }
+ return builder_->Append(value);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ const TimestampType* timestamp_type_;
+ std::shared_ptr<TimestampBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for day-time interval arrays
+
+class DayTimeIntervalConverter final
+ : public ConcreteConverter<DayTimeIntervalConverter> {
+ public:
+ explicit DayTimeIntervalConverter(const std::shared_ptr<DataType>& type) {
+ this->type_ = type;
+ builder_ = std::make_shared<DayTimeIntervalBuilder>(default_memory_pool());
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ DayTimeIntervalType::DayMilliseconds value;
+ if (!json_obj.IsArray()) {
+ return JSONTypeError("array", json_obj.GetType());
+ }
+ if (json_obj.Size() != 2) {
+ return Status::Invalid(
+ "day time interval pair must have exactly two elements, had ", json_obj.Size());
+ }
+ RETURN_NOT_OK(ConvertNumber<Int32Type>(json_obj[0], *this->type_, &value.days));
+ RETURN_NOT_OK(
+ ConvertNumber<Int32Type>(json_obj[1], *this->type_, &value.milliseconds));
+ return builder_->Append(value);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<DayTimeIntervalBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for binary and string arrays
+
+template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType>
+class StringConverter final
+ : public ConcreteConverter<StringConverter<Type, BuilderType>> {
+ public:
+ explicit StringConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; }
+
+ Status Init() override { return this->MakeConcreteBuilder(&builder_); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ if (json_obj.IsString()) {
+ auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength());
+ return builder_->Append(view);
+ } else {
+ return JSONTypeError("string", json_obj.GetType());
+ }
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for fixed-size binary arrays
+
+template <typename BuilderType = typename TypeTraits<FixedSizeBinaryType>::BuilderType>
+class FixedSizeBinaryConverter final
+ : public ConcreteConverter<FixedSizeBinaryConverter<BuilderType>> {
+ public:
+ explicit FixedSizeBinaryConverter(const std::shared_ptr<DataType>& type) {
+ this->type_ = type;
+ }
+
+ Status Init() override { return this->MakeConcreteBuilder(&builder_); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ if (json_obj.IsString()) {
+ auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength());
+ if (view.length() != static_cast<size_t>(builder_->byte_width())) {
+ std::stringstream ss;
+ ss << "Invalid string length " << view.length() << " in JSON input for "
+ << this->type_->ToString();
+ return Status::Invalid(ss.str());
+ }
+ return builder_->Append(view);
+ } else {
+ return JSONTypeError("string", json_obj.GetType());
+ }
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for list arrays
+
+template <typename TYPE>
+class ListConverter final : public ConcreteConverter<ListConverter<TYPE>> {
+ public:
+ using BuilderType = typename TypeTraits<TYPE>::BuilderType;
+
+ explicit ListConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; }
+
+ Status Init() override {
+ const auto& list_type = checked_cast<const TYPE&>(*this->type_);
+ RETURN_NOT_OK(GetConverter(list_type.value_type(), &child_converter_));
+ auto child_builder = child_converter_->builder();
+ builder_ =
+ std::make_shared<BuilderType>(default_memory_pool(), child_builder, this->type_);
+ return Status::OK();
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ RETURN_NOT_OK(builder_->Append());
+ // Extend the child converter with this JSON array
+ return child_converter_->AppendValues(json_obj);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<BuilderType> builder_;
+ std::shared_ptr<Converter> child_converter_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for map arrays
+
+class MapConverter final : public ConcreteConverter<MapConverter> {
+ public:
+ explicit MapConverter(const std::shared_ptr<DataType>& type) { type_ = type; }
+
+ Status Init() override {
+ const auto& map_type = checked_cast<const MapType&>(*type_);
+ RETURN_NOT_OK(GetConverter(map_type.key_type(), &key_converter_));
+ RETURN_NOT_OK(GetConverter(map_type.item_type(), &item_converter_));
+ auto key_builder = key_converter_->builder();
+ auto item_builder = item_converter_->builder();
+ builder_ = std::make_shared<MapBuilder>(default_memory_pool(), key_builder,
+ item_builder, type_);
+ return Status::OK();
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ RETURN_NOT_OK(builder_->Append());
+ if (!json_obj.IsArray()) {
+ return JSONTypeError("array", json_obj.GetType());
+ }
+ auto size = json_obj.Size();
+ for (uint32_t i = 0; i < size; ++i) {
+ const auto& json_pair = json_obj[i];
+ if (!json_pair.IsArray()) {
+ return JSONTypeError("array", json_pair.GetType());
+ }
+ if (json_pair.Size() != 2) {
+ return Status::Invalid("key item pair must have exactly two elements, had ",
+ json_pair.Size());
+ }
+ if (json_pair[0].IsNull()) {
+ return Status::Invalid("null key is invalid");
+ }
+ RETURN_NOT_OK(key_converter_->AppendValue(json_pair[0]));
+ RETURN_NOT_OK(item_converter_->AppendValue(json_pair[1]));
+ }
+ return Status::OK();
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<MapBuilder> builder_;
+ std::shared_ptr<Converter> key_converter_, item_converter_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for fixed size list arrays
+
+class FixedSizeListConverter final : public ConcreteConverter<FixedSizeListConverter> {
+ public:
+ explicit FixedSizeListConverter(const std::shared_ptr<DataType>& type) { type_ = type; }
+
+ Status Init() override {
+ const auto& list_type = checked_cast<const FixedSizeListType&>(*type_);
+ list_size_ = list_type.list_size();
+ RETURN_NOT_OK(GetConverter(list_type.value_type(), &child_converter_));
+ auto child_builder = child_converter_->builder();
+ builder_ = std::make_shared<FixedSizeListBuilder>(default_memory_pool(),
+ child_builder, type_);
+ return Status::OK();
+ }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ RETURN_NOT_OK(builder_->Append());
+ // Extend the child converter with this JSON array
+ RETURN_NOT_OK(child_converter_->AppendValues(json_obj));
+ if (json_obj.GetArray().Size() != static_cast<rj::SizeType>(list_size_)) {
+ return Status::Invalid("incorrect list size ", json_obj.GetArray().Size());
+ }
+ return Status::OK();
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ int32_t list_size_;
+ std::shared_ptr<FixedSizeListBuilder> builder_;
+ std::shared_ptr<Converter> child_converter_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for struct arrays
+
+class StructConverter final : public ConcreteConverter<StructConverter> {
+ public:
+ explicit StructConverter(const std::shared_ptr<DataType>& type) { type_ = type; }
+
+ Status Init() override {
+ std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
+ for (const auto& field : type_->fields()) {
+ std::shared_ptr<Converter> child_converter;
+ RETURN_NOT_OK(GetConverter(field->type(), &child_converter));
+ child_converters_.push_back(child_converter);
+ child_builders.push_back(child_converter->builder());
+ }
+ builder_ = std::make_shared<StructBuilder>(type_, default_memory_pool(),
+ std::move(child_builders));
+ return Status::OK();
+ }
+
+ // Append a JSON value that is either an array of N elements in order
+ // or an object mapping struct names to values (omitted struct members
+ // are mapped to null).
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ if (json_obj.IsArray()) {
+ auto size = json_obj.Size();
+ auto expected_size = static_cast<uint32_t>(type_->num_fields());
+ if (size != expected_size) {
+ return Status::Invalid("Expected array of size ", expected_size,
+ ", got array of size ", size);
+ }
+ for (uint32_t i = 0; i < size; ++i) {
+ RETURN_NOT_OK(child_converters_[i]->AppendValue(json_obj[i]));
+ }
+ return builder_->Append();
+ }
+ if (json_obj.IsObject()) {
+ auto remaining = json_obj.MemberCount();
+ auto num_children = type_->num_fields();
+ for (int32_t i = 0; i < num_children; ++i) {
+ const auto& field = type_->field(i);
+ auto it = json_obj.FindMember(field->name());
+ if (it != json_obj.MemberEnd()) {
+ --remaining;
+ RETURN_NOT_OK(child_converters_[i]->AppendValue(it->value));
+ } else {
+ RETURN_NOT_OK(child_converters_[i]->AppendNull());
+ }
+ }
+ if (remaining > 0) {
+ rj::StringBuffer sb;
+ rj::Writer<rj::StringBuffer> writer(sb);
+ json_obj.Accept(writer);
+ return Status::Invalid("Unexpected members in JSON object for type ",
+ type_->ToString(), " Object: ", sb.GetString());
+ }
+ return builder_->Append();
+ }
+ return JSONTypeError("array or object", json_obj.GetType());
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ std::shared_ptr<StructBuilder> builder_;
+ std::vector<std::shared_ptr<Converter>> child_converters_;
+};
+
+// ------------------------------------------------------------------------
+// Converter for union arrays
+
+class UnionConverter final : public ConcreteConverter<UnionConverter> {
+ public:
+ explicit UnionConverter(const std::shared_ptr<DataType>& type) { type_ = type; }
+
+ Status Init() override {
+ auto union_type = checked_cast<const UnionType*>(type_.get());
+ mode_ = union_type->mode();
+ type_id_to_child_num_.clear();
+ type_id_to_child_num_.resize(union_type->max_type_code() + 1, -1);
+ int child_i = 0;
+ for (auto type_id : union_type->type_codes()) {
+ type_id_to_child_num_[type_id] = child_i++;
+ }
+ std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
+ for (const auto& field : type_->fields()) {
+ std::shared_ptr<Converter> child_converter;
+ RETURN_NOT_OK(GetConverter(field->type(), &child_converter));
+ child_converters_.push_back(child_converter);
+ child_builders.push_back(child_converter->builder());
+ }
+ if (mode_ == UnionMode::DENSE) {
+ builder_ = std::make_shared<DenseUnionBuilder>(default_memory_pool(),
+ std::move(child_builders), type_);
+ } else {
+ builder_ = std::make_shared<SparseUnionBuilder>(default_memory_pool(),
+ std::move(child_builders), type_);
+ }
+ return Status::OK();
+ }
+
+ // Append a JSON value that must be a 2-long array, containing the type_id
+ // and value of the UnionArray's slot.
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return this->AppendNull();
+ }
+ if (!json_obj.IsArray()) {
+ return JSONTypeError("array", json_obj.GetType());
+ }
+ if (json_obj.Size() != 2) {
+ return Status::Invalid("Expected [type_id, value] pair, got array of size ",
+ json_obj.Size());
+ }
+ const auto& id_obj = json_obj[0];
+ if (!id_obj.IsInt()) {
+ return JSONTypeError("int", id_obj.GetType());
+ }
+
+ auto id = static_cast<int8_t>(id_obj.GetInt());
+ auto child_num = type_id_to_child_num_[id];
+ if (child_num == -1) {
+ return Status::Invalid("type_id ", id, " not found in ", *type_);
+ }
+
+ auto child_converter = child_converters_[child_num];
+ if (mode_ == UnionMode::SPARSE) {
+ RETURN_NOT_OK(checked_cast<SparseUnionBuilder&>(*builder_).Append(id));
+ for (auto&& other_converter : child_converters_) {
+ if (other_converter != child_converter) {
+ RETURN_NOT_OK(other_converter->AppendNull());
+ }
+ }
+ } else {
+ RETURN_NOT_OK(checked_cast<DenseUnionBuilder&>(*builder_).Append(id));
+ }
+ return child_converter->AppendValue(json_obj[1]);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ UnionMode::type mode_;
+ std::shared_ptr<ArrayBuilder> builder_;
+ std::vector<std::shared_ptr<Converter>> child_converters_;
+ std::vector<int8_t> type_id_to_child_num_;
+};
+
+// ------------------------------------------------------------------------
+// General conversion functions
+
+Status ConversionNotImplemented(const std::shared_ptr<DataType>& type) {
+ return Status::NotImplemented("JSON conversion to ", type->ToString(),
+ " not implemented");
+}
+
+Status GetDictConverter(const std::shared_ptr<DataType>& type,
+ std::shared_ptr<Converter>* out) {
+ std::shared_ptr<Converter> res;
+
+ const auto value_type = checked_cast<const DictionaryType&>(*type).value_type();
+
+#define SIMPLE_CONVERTER_CASE(ID, CLASS, TYPE) \
+ case ID: \
+ res = std::make_shared<CLASS<DictionaryBuilder<TYPE>>>(type); \
+ break;
+
+#define PARAM_CONVERTER_CASE(ID, CLASS, TYPE) \
+ case ID: \
+ res = std::make_shared<CLASS<TYPE, DictionaryBuilder<TYPE>>>(type); \
+ break;
+
+ switch (value_type->id()) {
+ PARAM_CONVERTER_CASE(Type::INT8, IntegerConverter, Int8Type)
+ PARAM_CONVERTER_CASE(Type::INT16, IntegerConverter, Int16Type)
+ PARAM_CONVERTER_CASE(Type::INT32, IntegerConverter, Int32Type)
+ PARAM_CONVERTER_CASE(Type::INT64, IntegerConverter, Int64Type)
+ PARAM_CONVERTER_CASE(Type::UINT8, IntegerConverter, UInt8Type)
+ PARAM_CONVERTER_CASE(Type::UINT16, IntegerConverter, UInt16Type)
+ PARAM_CONVERTER_CASE(Type::UINT32, IntegerConverter, UInt32Type)
+ PARAM_CONVERTER_CASE(Type::UINT64, IntegerConverter, UInt64Type)
+ PARAM_CONVERTER_CASE(Type::FLOAT, FloatConverter, FloatType)
+ PARAM_CONVERTER_CASE(Type::DOUBLE, FloatConverter, DoubleType)
+ PARAM_CONVERTER_CASE(Type::STRING, StringConverter, StringType)
+ PARAM_CONVERTER_CASE(Type::BINARY, StringConverter, BinaryType)
+ PARAM_CONVERTER_CASE(Type::LARGE_STRING, StringConverter, LargeStringType)
+ PARAM_CONVERTER_CASE(Type::LARGE_BINARY, StringConverter, LargeBinaryType)
+ SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_BINARY, FixedSizeBinaryConverter,
+ FixedSizeBinaryType)
+ SIMPLE_CONVERTER_CASE(Type::DECIMAL128, Decimal128Converter, Decimal128Type)
+ SIMPLE_CONVERTER_CASE(Type::DECIMAL256, Decimal256Converter, Decimal256Type)
+ default:
+ return ConversionNotImplemented(type);
+ }
+
+#undef SIMPLE_CONVERTER_CASE
+#undef PARAM_CONVERTER_CASE
+
+ RETURN_NOT_OK(res->Init());
+ *out = res;
+ return Status::OK();
+}
+
+Status GetConverter(const std::shared_ptr<DataType>& type,
+ std::shared_ptr<Converter>* out) {
+ if (type->id() == Type::DICTIONARY) {
+ return GetDictConverter(type, out);
+ }
+
+ std::shared_ptr<Converter> res;
+
+#define SIMPLE_CONVERTER_CASE(ID, CLASS) \
+ case ID: \
+ res = std::make_shared<CLASS>(type); \
+ break;
+
+ switch (type->id()) {
+ SIMPLE_CONVERTER_CASE(Type::INT8, IntegerConverter<Int8Type>)
+ SIMPLE_CONVERTER_CASE(Type::INT16, IntegerConverter<Int16Type>)
+ SIMPLE_CONVERTER_CASE(Type::INT32, IntegerConverter<Int32Type>)
+ SIMPLE_CONVERTER_CASE(Type::INT64, IntegerConverter<Int64Type>)
+ SIMPLE_CONVERTER_CASE(Type::UINT8, IntegerConverter<UInt8Type>)
+ SIMPLE_CONVERTER_CASE(Type::UINT16, IntegerConverter<UInt16Type>)
+ SIMPLE_CONVERTER_CASE(Type::UINT32, IntegerConverter<UInt32Type>)
+ SIMPLE_CONVERTER_CASE(Type::UINT64, IntegerConverter<UInt64Type>)
+ SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, TimestampConverter)
+ SIMPLE_CONVERTER_CASE(Type::DATE32, IntegerConverter<Date32Type>)
+ SIMPLE_CONVERTER_CASE(Type::DATE64, IntegerConverter<Date64Type>)
+ SIMPLE_CONVERTER_CASE(Type::TIME32, IntegerConverter<Time32Type>)
+ SIMPLE_CONVERTER_CASE(Type::TIME64, IntegerConverter<Time64Type>)
+ SIMPLE_CONVERTER_CASE(Type::DURATION, IntegerConverter<DurationType>)
+ SIMPLE_CONVERTER_CASE(Type::NA, NullConverter)
+ SIMPLE_CONVERTER_CASE(Type::BOOL, BooleanConverter)
+ SIMPLE_CONVERTER_CASE(Type::HALF_FLOAT, IntegerConverter<HalfFloatType>)
+ SIMPLE_CONVERTER_CASE(Type::FLOAT, FloatConverter<FloatType>)
+ SIMPLE_CONVERTER_CASE(Type::DOUBLE, FloatConverter<DoubleType>)
+ SIMPLE_CONVERTER_CASE(Type::LIST, ListConverter<ListType>)
+ SIMPLE_CONVERTER_CASE(Type::LARGE_LIST, ListConverter<LargeListType>)
+ SIMPLE_CONVERTER_CASE(Type::MAP, MapConverter)
+ SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_LIST, FixedSizeListConverter)
+ SIMPLE_CONVERTER_CASE(Type::STRUCT, StructConverter)
+ SIMPLE_CONVERTER_CASE(Type::STRING, StringConverter<StringType>)
+ SIMPLE_CONVERTER_CASE(Type::BINARY, StringConverter<BinaryType>)
+ SIMPLE_CONVERTER_CASE(Type::LARGE_STRING, StringConverter<LargeStringType>)
+ SIMPLE_CONVERTER_CASE(Type::LARGE_BINARY, StringConverter<LargeBinaryType>)
+ SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_BINARY, FixedSizeBinaryConverter<>)
+ SIMPLE_CONVERTER_CASE(Type::DECIMAL128, Decimal128Converter<>)
+ SIMPLE_CONVERTER_CASE(Type::DECIMAL256, Decimal256Converter<>)
+ SIMPLE_CONVERTER_CASE(Type::SPARSE_UNION, UnionConverter)
+ SIMPLE_CONVERTER_CASE(Type::DENSE_UNION, UnionConverter)
+ SIMPLE_CONVERTER_CASE(Type::INTERVAL_MONTHS, IntegerConverter<MonthIntervalType>)
+ SIMPLE_CONVERTER_CASE(Type::INTERVAL_DAY_TIME, DayTimeIntervalConverter)
+ default:
+ return ConversionNotImplemented(type);
+ }
+
+#undef SIMPLE_CONVERTER_CASE
+
+ RETURN_NOT_OK(res->Init());
+ *out = res;
+ return Status::OK();
+}
+
+} // namespace
+
+Status ArrayFromJSON(const std::shared_ptr<DataType>& type, util::string_view json_string,
+ std::shared_ptr<Array>* out) {
+ std::shared_ptr<Converter> converter;
+ RETURN_NOT_OK(GetConverter(type, &converter));
+
+ rj::Document json_doc;
+ json_doc.Parse<kParseFlags>(json_string.data(), json_string.length());
+ if (json_doc.HasParseError()) {
+ return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ",
+ GetParseError_En(json_doc.GetParseError()));
+ }
+
+ // The JSON document should be an array, append it
+ RETURN_NOT_OK(converter->AppendValues(json_doc));
+ return converter->Finish(out);
+}
+
+Status ArrayFromJSON(const std::shared_ptr<DataType>& type,
+ const std::string& json_string, std::shared_ptr<Array>* out) {
+ return ArrayFromJSON(type, util::string_view(json_string), out);
+}
+
+Status ArrayFromJSON(const std::shared_ptr<DataType>& type, const char* json_string,
+ std::shared_ptr<Array>* out) {
+ return ArrayFromJSON(type, util::string_view(json_string), out);
+}
+
+Status DictArrayFromJSON(const std::shared_ptr<DataType>& type,
+ util::string_view indices_json,
+ util::string_view dictionary_json, std::shared_ptr<Array>* out) {
+ if (type->id() != Type::DICTIONARY) {
+ return Status::TypeError("DictArrayFromJSON requires dictionary type, got ", *type);
+ }
+
+ const auto& dictionary_type = checked_cast<const DictionaryType&>(*type);
+
+ std::shared_ptr<Array> indices, dictionary;
+ RETURN_NOT_OK(ArrayFromJSON(dictionary_type.index_type(), indices_json, &indices));
+ RETURN_NOT_OK(
+ ArrayFromJSON(dictionary_type.value_type(), dictionary_json, &dictionary));
+
+ return DictionaryArray::FromArrays(type, std::move(indices), std::move(dictionary))
+ .Value(out);
+}
+
+Status ScalarFromJSON(const std::shared_ptr<DataType>& type,
+ util::string_view json_string, std::shared_ptr<Scalar>* out) {
+ std::shared_ptr<Converter> converter;
+ RETURN_NOT_OK(GetConverter(type, &converter));
+
+ rj::Document json_doc;
+ json_doc.Parse<kParseFlags>(json_string.data(), json_string.length());
+ if (json_doc.HasParseError()) {
+ return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ",
+ GetParseError_En(json_doc.GetParseError()));
+ }
+
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(converter->AppendValue(json_doc));
+ RETURN_NOT_OK(converter->Finish(&array));
+ DCHECK_EQ(array->length(), 1);
+ ARROW_ASSIGN_OR_RAISE(*out, array->GetScalar(0));
+ return Status::OK();
+}
+
+} // namespace json
+} // namespace internal
+} // namespace ipc
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc
new file mode 100644
index 00000000000..040009c764f
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/chunked_builder.h"
+
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/json/converter.h"
+#include "arrow/table.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/task_group.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+using internal::TaskGroup;
+
+namespace json {
+
+class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder {
+ public:
+ NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
+ std::shared_ptr<Converter> converter)
+ : ChunkedArrayBuilder(task_group), converter_(std::move(converter)) {}
+
+ Status Finish(std::shared_ptr<ChunkedArray>* out) override {
+ RETURN_NOT_OK(task_group_->Finish());
+ *out = std::make_shared<ChunkedArray>(std::move(chunks_), converter_->out_type());
+ chunks_.clear();
+ return Status::OK();
+ }
+
+ Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
+ RETURN_NOT_OK(task_group_->Finish());
+ task_group_ = task_group;
+ return Status::OK();
+ }
+
+ protected:
+ ArrayVector chunks_;
+ std::mutex mutex_;
+ std::shared_ptr<Converter> converter_;
+};
+
+class TypedChunkedArrayBuilder
+ : public NonNestedChunkedArrayBuilder,
+ public std::enable_shared_from_this<TypedChunkedArrayBuilder> {
+ public:
+ using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder;
+
+ void Insert(int64_t block_index, const std::shared_ptr<Field>&,
+ const std::shared_ptr<Array>& unconverted) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (chunks_.size() <= static_cast<size_t>(block_index)) {
+ chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
+ }
+ lock.unlock();
+
+ auto self = shared_from_this();
+
+ task_group_->Append([self, block_index, unconverted] {
+ std::shared_ptr<Array> converted;
+ RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted));
+ std::unique_lock<std::mutex> lock(self->mutex_);
+ self->chunks_[block_index] = std::move(converted);
+ return Status::OK();
+ });
+ }
+};
+
+class InferringChunkedArrayBuilder
+ : public NonNestedChunkedArrayBuilder,
+ public std::enable_shared_from_this<InferringChunkedArrayBuilder> {
+ public:
+ InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
+ const PromotionGraph* promotion_graph,
+ std::shared_ptr<Converter> converter)
+ : NonNestedChunkedArrayBuilder(task_group, std::move(converter)),
+ promotion_graph_(promotion_graph) {}
+
+ void Insert(int64_t block_index, const std::shared_ptr<Field>& unconverted_field,
+ const std::shared_ptr<Array>& unconverted) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (chunks_.size() <= static_cast<size_t>(block_index)) {
+ chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
+ unconverted_.resize(chunks_.size(), nullptr);
+ unconverted_fields_.resize(chunks_.size(), nullptr);
+ }
+ unconverted_[block_index] = unconverted;
+ unconverted_fields_[block_index] = unconverted_field;
+ lock.unlock();
+ ScheduleConvertChunk(block_index);
+ }
+
+ void ScheduleConvertChunk(int64_t block_index) {
+ auto self = shared_from_this();
+ task_group_->Append([self, block_index] {
+ return self->TryConvertChunk(static_cast<size_t>(block_index));
+ });
+ }
+
+ Status TryConvertChunk(size_t block_index) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto converter = converter_;
+ auto unconverted = unconverted_[block_index];
+ auto unconverted_field = unconverted_fields_[block_index];
+ std::shared_ptr<Array> converted;
+
+ lock.unlock();
+ Status st = converter->Convert(unconverted, &converted);
+ lock.lock();
+
+ if (converter != converter_) {
+ // another task promoted converter; reconvert
+ lock.unlock();
+ ScheduleConvertChunk(block_index);
+ return Status::OK();
+ }
+
+ if (st.ok()) {
+ // conversion succeeded
+ chunks_[block_index] = std::move(converted);
+ return Status::OK();
+ }
+
+ auto promoted_type =
+ promotion_graph_->Promote(converter_->out_type(), unconverted_field);
+ if (promoted_type == nullptr) {
+ // converter failed, no promotion available
+ return st;
+ }
+ RETURN_NOT_OK(MakeConverter(promoted_type, converter_->pool(), &converter_));
+
+ size_t nchunks = chunks_.size();
+ for (size_t i = 0; i < nchunks; ++i) {
+ if (i != block_index && chunks_[i]) {
+ // We're assuming the chunk was converted using the wrong type
+ // (which should be true unless the executor reorders tasks)
+ chunks_[i].reset();
+ lock.unlock();
+ ScheduleConvertChunk(i);
+ lock.lock();
+ }
+ }
+ lock.unlock();
+ ScheduleConvertChunk(block_index);
+ return Status::OK();
+ }
+
+ Status Finish(std::shared_ptr<ChunkedArray>* out) override {
+ RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out));
+ unconverted_.clear();
+ return Status::OK();
+ }
+
+ private:
+ ArrayVector unconverted_;
+ std::vector<std::shared_ptr<Field>> unconverted_fields_;
+ const PromotionGraph* promotion_graph_;
+};
+
+class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
+ public:
+ ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
+ std::shared_ptr<ChunkedArrayBuilder> value_builder,
+ const std::shared_ptr<Field>& value_field)
+ : ChunkedArrayBuilder(task_group),
+ pool_(pool),
+ value_builder_(std::move(value_builder)),
+ value_field_(value_field) {}
+
+ Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
+ RETURN_NOT_OK(task_group_->Finish());
+ RETURN_NOT_OK(value_builder_->ReplaceTaskGroup(task_group));
+ task_group_ = task_group;
+ return Status::OK();
+ }
+
+ void Insert(int64_t block_index, const std::shared_ptr<Field>&,
+ const std::shared_ptr<Array>& unconverted) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ if (unconverted->type_id() == Type::NA) {
+ auto st = InsertNull(block_index, unconverted->length());
+ if (!st.ok()) {
+ task_group_->Append([st] { return st; });
+ }
+ return;
+ }
+
+ DCHECK_EQ(unconverted->type_id(), Type::LIST);
+ const auto& list_array = checked_cast<const ListArray&>(*unconverted);
+
+ if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
+ null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
+ offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
+ }
+ null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
+ offset_chunks_[block_index] = list_array.value_offsets();
+
+ value_builder_->Insert(block_index, list_array.list_type()->value_field(),
+ list_array.values());
+ }
+
+ Status Finish(std::shared_ptr<ChunkedArray>* out) override {
+ RETURN_NOT_OK(task_group_->Finish());
+
+ std::shared_ptr<ChunkedArray> value_array;
+ RETURN_NOT_OK(value_builder_->Finish(&value_array));
+
+ auto type = list(value_field_->WithType(value_array->type())->WithMetadata(nullptr));
+ ArrayVector chunks(null_bitmap_chunks_.size());
+ for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
+ auto value_chunk = value_array->chunk(static_cast<int>(i));
+ auto length = offset_chunks_[i]->size() / sizeof(int32_t) - 1;
+ chunks[i] = std::make_shared<ListArray>(type, length, offset_chunks_[i],
+ value_chunk, null_bitmap_chunks_[i]);
+ }
+
+ *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
+ return Status::OK();
+ }
+
+ private:
+ // call from Insert() only, with mutex_ locked
+ Status InsertNull(int64_t block_index, int64_t length) {
+ value_builder_->Insert(block_index, value_field_, std::make_shared<NullArray>(0));
+
+ ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_[block_index],
+ AllocateEmptyBitmap(length, pool_));
+
+ int64_t offsets_length = (length + 1) * sizeof(int32_t);
+ ARROW_ASSIGN_OR_RAISE(offset_chunks_[block_index],
+ AllocateBuffer(offsets_length, pool_));
+ std::memset(offset_chunks_[block_index]->mutable_data(), 0, offsets_length);
+
+ return Status::OK();
+ }
+
+ std::mutex mutex_;
+ MemoryPool* pool_;
+ std::shared_ptr<ChunkedArrayBuilder> value_builder_;
+ BufferVector offset_chunks_, null_bitmap_chunks_;
+ std::shared_ptr<Field> value_field_;
+};
+
+class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
+ public:
+ ChunkedStructArrayBuilder(
+ const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
+ const PromotionGraph* promotion_graph,
+ std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
+ name_builders)
+ : ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) {
+ for (auto&& name_builder : name_builders) {
+ auto index = static_cast<int>(name_to_index_.size());
+ name_to_index_.emplace(std::move(name_builder.first), index);
+ child_builders_.emplace_back(std::move(name_builder.second));
+ }
+ }
+
+ void Insert(int64_t block_index, const std::shared_ptr<Field>&,
+ const std::shared_ptr<Array>& unconverted) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
+ null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
+ chunk_lengths_.resize(null_bitmap_chunks_.size(), -1);
+ child_absent_.resize(null_bitmap_chunks_.size(), std::vector<bool>(0));
+ }
+ null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
+ chunk_lengths_[block_index] = unconverted->length();
+
+ if (unconverted->type_id() == Type::NA) {
+ auto maybe_buffer = AllocateBitmap(unconverted->length(), pool_);
+ if (maybe_buffer.ok()) {
+ null_bitmap_chunks_[block_index] = *std::move(maybe_buffer);
+ std::memset(null_bitmap_chunks_[block_index]->mutable_data(), 0,
+ null_bitmap_chunks_[block_index]->size());
+ } else {
+ Status st = maybe_buffer.status();
+ task_group_->Append([st] { return st; });
+ }
+
+ // absent fields will be inserted at Finish
+ return;
+ }
+
+ const auto& struct_array = checked_cast<const StructArray&>(*unconverted);
+ if (promotion_graph_ == nullptr) {
+ // If unexpected fields are ignored or result in an error then all parsers will emit
+ // columns exclusively in the ordering specified in ParseOptions::explicit_schema,
+ // so child_builders_ is immutable and no associative lookup is necessary.
+ for (int i = 0; i < unconverted->num_fields(); ++i) {
+ child_builders_[i]->Insert(block_index, unconverted->type()->field(i),
+ struct_array.field(i));
+ }
+ } else {
+ auto st = InsertChildren(block_index, struct_array);
+ if (!st.ok()) {
+ return task_group_->Append([st] { return st; });
+ }
+ }
+ }
+
+ Status Finish(std::shared_ptr<ChunkedArray>* out) override {
+ RETURN_NOT_OK(task_group_->Finish());
+
+ if (promotion_graph_ != nullptr) {
+ // insert absent child chunks
+ for (auto&& name_index : name_to_index_) {
+ auto child_builder = child_builders_[name_index.second].get();
+
+ RETURN_NOT_OK(child_builder->ReplaceTaskGroup(TaskGroup::MakeSerial()));
+
+ for (size_t i = 0; i < chunk_lengths_.size(); ++i) {
+ if (child_absent_[i].size() > static_cast<size_t>(name_index.second) &&
+ !child_absent_[i][name_index.second]) {
+ continue;
+ }
+ auto empty = std::make_shared<NullArray>(chunk_lengths_[i]);
+ child_builder->Insert(i, promotion_graph_->Null(name_index.first), empty);
+ }
+ }
+ }
+
+ std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
+ std::vector<std::shared_ptr<ChunkedArray>> child_arrays(name_to_index_.size());
+ for (auto&& name_index : name_to_index_) {
+ auto child_builder = child_builders_[name_index.second].get();
+
+ std::shared_ptr<ChunkedArray> child_array;
+ RETURN_NOT_OK(child_builder->Finish(&child_array));
+
+ child_arrays[name_index.second] = child_array;
+ fields[name_index.second] = field(name_index.first, child_array->type());
+ }
+
+ auto type = struct_(std::move(fields));
+ ArrayVector chunks(null_bitmap_chunks_.size());
+ for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
+ ArrayVector child_chunks;
+ for (const auto& child_array : child_arrays) {
+ child_chunks.push_back(child_array->chunk(static_cast<int>(i)));
+ }
+ chunks[i] = std::make_shared<StructArray>(type, chunk_lengths_[i], child_chunks,
+ null_bitmap_chunks_[i]);
+ }
+
+ *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
+ return Status::OK();
+ }
+
+ Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
+ RETURN_NOT_OK(task_group_->Finish());
+ for (auto&& child_builder : child_builders_) {
+ RETURN_NOT_OK(child_builder->ReplaceTaskGroup(task_group));
+ }
+ task_group_ = task_group;
+ return Status::OK();
+ }
+
+ private:
+ // Insert children associatively by name; the unconverted block may have unexpected or
+ // differently ordered fields
+ // call from Insert() only, with mutex_ locked
+ Status InsertChildren(int64_t block_index, const StructArray& unconverted) {
+ const auto& fields = unconverted.type()->fields();
+
+ for (int i = 0; i < unconverted.num_fields(); ++i) {
+ auto it = name_to_index_.find(fields[i]->name());
+
+ if (it == name_to_index_.end()) {
+ // add a new field to this builder
+ auto type = promotion_graph_->Infer(fields[i]);
+ DCHECK_NE(type, nullptr)
+ << "invalid unconverted_field encountered in conversion: "
+ << fields[i]->name() << ":" << *fields[i]->type();
+
+ auto new_index = static_cast<int>(name_to_index_.size());
+ it = name_to_index_.emplace(fields[i]->name(), new_index).first;
+
+ std::shared_ptr<ChunkedArrayBuilder> child_builder;
+ RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type,
+ &child_builder));
+ child_builders_.emplace_back(std::move(child_builder));
+ }
+
+ auto unconverted_field = unconverted.type()->field(i);
+ child_builders_[it->second]->Insert(block_index, unconverted_field,
+ unconverted.field(i));
+
+ child_absent_[block_index].resize(child_builders_.size(), true);
+ child_absent_[block_index][it->second] = false;
+ }
+
+ return Status::OK();
+ }
+
+ std::mutex mutex_;
+ MemoryPool* pool_;
+ const PromotionGraph* promotion_graph_;
+ std::unordered_map<std::string, int> name_to_index_;
+ std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_;
+ std::vector<std::vector<bool>> child_absent_;
+ BufferVector null_bitmap_chunks_;
+ std::vector<int64_t> chunk_lengths_;
+};
+
+Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
+ MemoryPool* pool, const PromotionGraph* promotion_graph,
+ const std::shared_ptr<DataType>& type,
+ std::shared_ptr<ChunkedArrayBuilder>* out) {
+ if (type->id() == Type::STRUCT) {
+ std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
+ child_builders;
+ for (const auto& f : type->fields()) {
+ std::shared_ptr<ChunkedArrayBuilder> child_builder;
+ RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(),
+ &child_builder));
+ child_builders.emplace_back(f->name(), std::move(child_builder));
+ }
+ *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph,
+ std::move(child_builders));
+ return Status::OK();
+ }
+ if (type->id() == Type::LIST) {
+ const auto& list_type = checked_cast<const ListType&>(*type);
+ std::shared_ptr<ChunkedArrayBuilder> value_builder;
+ RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
+ list_type.value_type(), &value_builder));
+ *out = std::make_shared<ChunkedListArrayBuilder>(
+ task_group, pool, std::move(value_builder), list_type.value_field());
+ return Status::OK();
+ }
+ std::shared_ptr<Converter> converter;
+ RETURN_NOT_OK(MakeConverter(type, pool, &converter));
+ if (promotion_graph) {
+ *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph,
+ std::move(converter));
+ } else {
+ *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter));
+ }
+ return Status::OK();
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h
new file mode 100644
index 00000000000..93b327bf3ae
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/type_fwd.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+
+class PromotionGraph;
+
+class ARROW_EXPORT ChunkedArrayBuilder {
+ public:
+ virtual ~ChunkedArrayBuilder() = default;
+
+ /// Spawn a task that will try to convert and insert the given JSON block
+ virtual void Insert(int64_t block_index,
+ const std::shared_ptr<Field>& unconverted_field,
+ const std::shared_ptr<Array>& unconverted) = 0;
+
+ /// Return the final chunked array.
+ /// Every chunk must be inserted before this is called!
+ virtual Status Finish(std::shared_ptr<ChunkedArray>* out) = 0;
+
+ /// Finish current task group and substitute a new one
+ virtual Status ReplaceTaskGroup(
+ const std::shared_ptr<arrow::internal::TaskGroup>& task_group) = 0;
+
+ protected:
+ explicit ChunkedArrayBuilder(
+ const std::shared_ptr<arrow::internal::TaskGroup>& task_group)
+ : task_group_(task_group) {}
+
+ std::shared_ptr<arrow::internal::TaskGroup> task_group_;
+};
+
+/// create a chunked builder
+///
+/// if unexpected fields and promotion need to be handled, promotion_graph must be
+/// non-null
+ARROW_EXPORT Status MakeChunkedArrayBuilder(
+ const std::shared_ptr<arrow::internal::TaskGroup>& task_group, MemoryPool* pool,
+ const PromotionGraph* promotion_graph, const std::shared_ptr<DataType>& type,
+ std::shared_ptr<ChunkedArrayBuilder>* out);
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc
new file mode 100644
index 00000000000..b4b4d31eb94
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/chunker.h"
+
+#include <algorithm>
+#include <utility>
+#include <vector>
+
+#include "arrow/json/rapidjson_defs.h"
+#include "rapidjson/reader.h"
+
+#include "arrow/buffer.h"
+#include "arrow/json/options.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+
+using internal::make_unique;
+using util::string_view;
+
+namespace json {
+
+namespace rj = arrow::rapidjson;
+
+static size_t ConsumeWhitespace(string_view view) {
+#ifdef RAPIDJSON_SIMD
+ auto data = view.data();
+ auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size());
+ return nonws_begin - data;
+#else
+ auto ws_count = view.find_first_not_of(" \t\r\n");
+ if (ws_count == string_view::npos) {
+ return view.size();
+ } else {
+ return ws_count;
+ }
+#endif
+}
+
+/// RapidJson custom stream for reading JSON stored in multiple buffers
+/// http://rapidjson.org/md_doc_stream.html#CustomStream
+class MultiStringStream {
+ public:
+ using Ch = char;
+ explicit MultiStringStream(std::vector<string_view> strings)
+ : strings_(std::move(strings)) {
+ std::reverse(strings_.begin(), strings_.end());
+ }
+ explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ strings_[i] = string_view(*buffers[i]);
+ }
+ std::reverse(strings_.begin(), strings_.end());
+ }
+ char Peek() const {
+ if (strings_.size() == 0) return '\0';
+ return strings_.back()[0];
+ }
+ char Take() {
+ if (strings_.size() == 0) return '\0';
+ char taken = strings_.back()[0];
+ if (strings_.back().size() == 1) {
+ strings_.pop_back();
+ } else {
+ strings_.back() = strings_.back().substr(1);
+ }
+ ++index_;
+ return taken;
+ }
+ size_t Tell() { return index_; }
+ void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
+ void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
+ char* PutBegin() {
+ ARROW_LOG(FATAL) << "not implemented";
+ return nullptr;
+ }
+ size_t PutEnd(char*) {
+ ARROW_LOG(FATAL) << "not implemented";
+ return 0;
+ }
+
+ private:
+ size_t index_ = 0;
+ std::vector<string_view> strings_;
+};
+
+template <typename Stream>
+static size_t ConsumeWholeObject(Stream&& stream) {
+ static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
+ rj::kParseStopWhenDoneFlag |
+ rj::kParseNumbersAsStringsFlag;
+ rj::BaseReaderHandler<rj::UTF8<>> handler;
+ rj::Reader reader;
+ // parse a single JSON object
+ switch (reader.Parse<parse_flags>(stream, handler).Code()) {
+ case rj::kParseErrorNone:
+ return stream.Tell();
+ case rj::kParseErrorDocumentEmpty:
+ return 0;
+ default:
+ // rapidjson emitted an error, the most recent object was partial
+ return string_view::npos;
+ }
+}
+
+namespace {
+
+// A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
+// and uses actual JSON parsing to delimit them.
+class ParsingBoundaryFinder : public BoundaryFinder {
+ public:
+ Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override {
+ // NOTE: We could bubble up JSON parse errors here, but the actual parsing
+ // step will detect them later anyway.
+ auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
+ if (length == string_view::npos) {
+ *out_pos = -1;
+ } else {
+ DCHECK_GE(length, partial.size());
+ DCHECK_LE(length, partial.size() + block.size());
+ *out_pos = static_cast<int64_t>(length - partial.size());
+ }
+ return Status::OK();
+ }
+
+ Status FindLast(util::string_view block, int64_t* out_pos) override {
+ const size_t block_length = block.size();
+ size_t consumed_length = 0;
+ while (consumed_length < block_length) {
+ rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size());
+ using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
+ auto length = ConsumeWholeObject(InputStream(ms));
+ if (length == string_view::npos || length == 0) {
+ // found incomplete object or block is empty
+ break;
+ }
+ consumed_length += length;
+ block = block.substr(length);
+ }
+ if (consumed_length == 0) {
+ *out_pos = -1;
+ } else {
+ consumed_length += ConsumeWhitespace(block);
+ DCHECK_LE(consumed_length, block_length);
+ *out_pos = static_cast<int64_t>(consumed_length);
+ }
+ return Status::OK();
+ }
+
+ Status FindNth(util::string_view partial, util::string_view block, int64_t count,
+ int64_t* out_pos, int64_t* num_found) override {
+ return Status::NotImplemented("ParsingBoundaryFinder::FindNth");
+ }
+};
+
+} // namespace
+
+std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
+ std::shared_ptr<BoundaryFinder> delimiter;
+ if (options.newlines_in_values) {
+ delimiter = std::make_shared<ParsingBoundaryFinder>();
+ } else {
+ delimiter = MakeNewlineBoundaryFinder();
+ }
+ return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter)));
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h
new file mode 100644
index 00000000000..9ed85126da1
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/util/delimiting.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+
+struct ParseOptions;
+
+ARROW_EXPORT
+std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options);
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc
new file mode 100644
index 00000000000..fe9500d40ca
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/converter.h"
+
+#include <memory>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_time.h"
+#include "arrow/json/parser.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/value_parsing.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+using util::string_view;
+
+namespace json {
+
+template <typename... Args>
+Status GenericConversionError(const DataType& type, Args&&... args) {
+ return Status::Invalid("Failed of conversion of JSON to ", type,
+ std::forward<Args>(args)...);
+}
+
+namespace {
+
+const DictionaryArray& GetDictionaryArray(const std::shared_ptr<Array>& in) {
+ DCHECK_EQ(in->type_id(), Type::DICTIONARY);
+ auto dict_type = checked_cast<const DictionaryType*>(in->type().get());
+ DCHECK_EQ(dict_type->index_type()->id(), Type::INT32);
+ DCHECK_EQ(dict_type->value_type()->id(), Type::STRING);
+ return checked_cast<const DictionaryArray&>(*in);
+}
+
+template <typename ValidVisitor, typename NullVisitor>
+Status VisitDictionaryEntries(const DictionaryArray& dict_array,
+ ValidVisitor&& visit_valid, NullVisitor&& visit_null) {
+ const StringArray& dict = checked_cast<const StringArray&>(*dict_array.dictionary());
+ const Int32Array& indices = checked_cast<const Int32Array&>(*dict_array.indices());
+ for (int64_t i = 0; i < indices.length(); ++i) {
+ if (indices.IsValid(i)) {
+ RETURN_NOT_OK(visit_valid(dict.GetView(indices.GetView(i))));
+ } else {
+ RETURN_NOT_OK(visit_null());
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+// base class for types which accept and output non-nested types
+class PrimitiveConverter : public Converter {
+ public:
+ PrimitiveConverter(MemoryPool* pool, std::shared_ptr<DataType> out_type)
+ : Converter(pool, out_type) {}
+};
+
+class NullConverter : public PrimitiveConverter {
+ public:
+ using PrimitiveConverter::PrimitiveConverter;
+
+ Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override {
+ if (in->type_id() != Type::NA) {
+ return GenericConversionError(*out_type_, " from ", *in->type());
+ }
+ *out = in;
+ return Status::OK();
+ }
+};
+
+class BooleanConverter : public PrimitiveConverter {
+ public:
+ using PrimitiveConverter::PrimitiveConverter;
+
+ Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override {
+ if (in->type_id() == Type::NA) {
+ return MakeArrayOfNull(boolean(), in->length(), pool_).Value(out);
+ }
+ if (in->type_id() != Type::BOOL) {
+ return GenericConversionError(*out_type_, " from ", *in->type());
+ }
+ *out = in;
+ return Status::OK();
+ }
+};
+
+template <typename T>
+class NumericConverter : public PrimitiveConverter {
+ public:
+ using value_type = typename T::c_type;
+
+ NumericConverter(MemoryPool* pool, const std::shared_ptr<DataType>& type)
+ : PrimitiveConverter(pool, type), numeric_type_(checked_cast<const T&>(*type)) {}
+
+ Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override {
+ if (in->type_id() == Type::NA) {
+ return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out);
+ }
+ const auto& dict_array = GetDictionaryArray(in);
+
+ using Builder = typename TypeTraits<T>::BuilderType;
+ Builder builder(out_type_, pool_);
+ RETURN_NOT_OK(builder.Resize(dict_array.indices()->length()));
+
+ auto visit_valid = [&](string_view repr) {
+ value_type value;
+ if (!arrow::internal::ParseValue(numeric_type_, repr.data(), repr.size(), &value)) {
+ return GenericConversionError(*out_type_, ", couldn't parse:", repr);
+ }
+
+ builder.UnsafeAppend(value);
+ return Status::OK();
+ };
+
+ auto visit_null = [&]() {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ };
+
+ RETURN_NOT_OK(VisitDictionaryEntries(dict_array, visit_valid, visit_null));
+ return builder.Finish(out);
+ }
+
+ const T& numeric_type_;
+};
+
+template <typename DateTimeType>
+class DateTimeConverter : public PrimitiveConverter {
+ public:
+ DateTimeConverter(MemoryPool* pool, const std::shared_ptr<DataType>& type)
+ : PrimitiveConverter(pool, type), converter_(pool, repr_type()) {}
+
+ Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override {
+ if (in->type_id() == Type::NA) {
+ return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out);
+ }
+
+ std::shared_ptr<Array> repr;
+ RETURN_NOT_OK(converter_.Convert(in, &repr));
+
+ auto out_data = repr->data()->Copy();
+ out_data->type = out_type_;
+ *out = MakeArray(out_data);
+
+ return Status::OK();
+ }
+
+ private:
+ using ReprType = typename CTypeTraits<typename DateTimeType::c_type>::ArrowType;
+ static std::shared_ptr<DataType> repr_type() {
+ return TypeTraits<ReprType>::type_singleton();
+ }
+ NumericConverter<ReprType> converter_;
+};
+
+template <typename T>
+class BinaryConverter : public PrimitiveConverter {
+ public:
+ using PrimitiveConverter::PrimitiveConverter;
+
+ Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override {
+ if (in->type_id() == Type::NA) {
+ return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out);
+ }
+ const auto& dict_array = GetDictionaryArray(in);
+
+ using Builder = typename TypeTraits<T>::BuilderType;
+ Builder builder(out_type_, pool_);
+ RETURN_NOT_OK(builder.Resize(dict_array.indices()->length()));
+
+ // TODO(bkietz) this can be computed during parsing at low cost
+ int64_t data_length = 0;
+ auto visit_lengths_valid = [&](string_view value) {
+ data_length += value.size();
+ return Status::OK();
+ };
+
+ auto visit_lengths_null = [&]() {
+ // no-op
+ return Status::OK();
+ };
+
+ RETURN_NOT_OK(
+ VisitDictionaryEntries(dict_array, visit_lengths_valid, visit_lengths_null));
+ RETURN_NOT_OK(builder.ReserveData(data_length));
+
+ auto visit_valid = [&](string_view value) {
+ builder.UnsafeAppend(value);
+ return Status::OK();
+ };
+
+ auto visit_null = [&]() {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ };
+
+ RETURN_NOT_OK(VisitDictionaryEntries(dict_array, visit_valid, visit_null));
+ return builder.Finish(out);
+ }
+};
+
+Status MakeConverter(const std::shared_ptr<DataType>& out_type, MemoryPool* pool,
+ std::shared_ptr<Converter>* out) {
+ switch (out_type->id()) {
+#define CONVERTER_CASE(TYPE_ID, CONVERTER_TYPE) \
+ case TYPE_ID: \
+ *out = std::make_shared<CONVERTER_TYPE>(pool, out_type); \
+ break
+ CONVERTER_CASE(Type::NA, NullConverter);
+ CONVERTER_CASE(Type::BOOL, BooleanConverter);
+ CONVERTER_CASE(Type::INT8, NumericConverter<Int8Type>);
+ CONVERTER_CASE(Type::INT16, NumericConverter<Int16Type>);
+ CONVERTER_CASE(Type::INT32, NumericConverter<Int32Type>);
+ CONVERTER_CASE(Type::INT64, NumericConverter<Int64Type>);
+ CONVERTER_CASE(Type::UINT8, NumericConverter<UInt8Type>);
+ CONVERTER_CASE(Type::UINT16, NumericConverter<UInt16Type>);
+ CONVERTER_CASE(Type::UINT32, NumericConverter<UInt32Type>);
+ CONVERTER_CASE(Type::UINT64, NumericConverter<UInt64Type>);
+ CONVERTER_CASE(Type::FLOAT, NumericConverter<FloatType>);
+ CONVERTER_CASE(Type::DOUBLE, NumericConverter<DoubleType>);
+ CONVERTER_CASE(Type::TIMESTAMP, NumericConverter<TimestampType>);
+ CONVERTER_CASE(Type::TIME32, DateTimeConverter<Time32Type>);
+ CONVERTER_CASE(Type::TIME64, DateTimeConverter<Time64Type>);
+ CONVERTER_CASE(Type::DATE32, DateTimeConverter<Date32Type>);
+ CONVERTER_CASE(Type::DATE64, DateTimeConverter<Date64Type>);
+ CONVERTER_CASE(Type::BINARY, BinaryConverter<BinaryType>);
+ CONVERTER_CASE(Type::STRING, BinaryConverter<StringType>);
+ CONVERTER_CASE(Type::LARGE_BINARY, BinaryConverter<LargeBinaryType>);
+ CONVERTER_CASE(Type::LARGE_STRING, BinaryConverter<LargeStringType>);
+ default:
+ return Status::NotImplemented("JSON conversion to ", *out_type,
+ " is not supported");
+#undef CONVERTER_CASE
+ }
+ return Status::OK();
+}
+
+const PromotionGraph* GetPromotionGraph() {
+ static struct : PromotionGraph {
+ std::shared_ptr<Field> Null(const std::string& name) const override {
+ return field(name, null(), true, Kind::Tag(Kind::kNull));
+ }
+
+ std::shared_ptr<DataType> Infer(
+ const std::shared_ptr<Field>& unexpected_field) const override {
+ auto kind = Kind::FromTag(unexpected_field->metadata());
+ switch (kind) {
+ case Kind::kNull:
+ return null();
+
+ case Kind::kBoolean:
+ return boolean();
+
+ case Kind::kNumber:
+ return int64();
+
+ case Kind::kString:
+ return timestamp(TimeUnit::SECOND);
+
+ case Kind::kArray: {
+ const auto& type = checked_cast<const ListType&>(*unexpected_field->type());
+ auto value_field = type.value_field();
+ return list(value_field->WithType(Infer(value_field)));
+ }
+ case Kind::kObject: {
+ auto fields = unexpected_field->type()->fields();
+ for (auto& field : fields) {
+ field = field->WithType(Infer(field));
+ }
+ return struct_(std::move(fields));
+ }
+ default:
+ return nullptr;
+ }
+ }
+
+ std::shared_ptr<DataType> Promote(
+ const std::shared_ptr<DataType>& failed,
+ const std::shared_ptr<Field>& unexpected_field) const override {
+ switch (failed->id()) {
+ case Type::NA:
+ return Infer(unexpected_field);
+
+ case Type::TIMESTAMP:
+ return utf8();
+
+ case Type::INT64:
+ return float64();
+
+ default:
+ return nullptr;
+ }
+ }
+ } impl;
+
+ return &impl;
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h
new file mode 100644
index 00000000000..9a812dd3c3a
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class DataType;
+class Field;
+class MemoryPool;
+
+namespace json {
+
+/// \brief interface for conversion of Arrays
+///
+/// Converters are not required to be correct for arbitrary input- only
+/// for unconverted arrays emitted by a corresponding parser.
+class ARROW_EXPORT Converter {
+ public:
+ virtual ~Converter() = default;
+
+ /// convert an array
+ /// on failure, this converter may be promoted to another converter which
+ /// *can* convert the given input.
+ virtual Status Convert(const std::shared_ptr<Array>& in,
+ std::shared_ptr<Array>* out) = 0;
+
+ std::shared_ptr<DataType> out_type() const { return out_type_; }
+
+ MemoryPool* pool() { return pool_; }
+
+ protected:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Converter);
+
+ Converter(MemoryPool* pool, const std::shared_ptr<DataType>& out_type)
+ : pool_(pool), out_type_(out_type) {}
+
+ MemoryPool* pool_;
+ std::shared_ptr<DataType> out_type_;
+};
+
+/// \brief produce a single converter to the specified out_type
+ARROW_EXPORT Status MakeConverter(const std::shared_ptr<DataType>& out_type,
+ MemoryPool* pool, std::shared_ptr<Converter>* out);
+
+class ARROW_EXPORT PromotionGraph {
+ public:
+ virtual ~PromotionGraph() = default;
+
+ /// \brief produce a valid field which will be inferred as null
+ virtual std::shared_ptr<Field> Null(const std::string& name) const = 0;
+
+ /// \brief given an unexpected field encountered during parsing, return a type to which
+ /// it may be convertible (may return null if none is available)
+ virtual std::shared_ptr<DataType> Infer(
+ const std::shared_ptr<Field>& unexpected_field) const = 0;
+
+ /// \brief given a type to which conversion failed, return a promoted type to which
+ /// conversion may succeed (may return null if none is available)
+ virtual std::shared_ptr<DataType> Promote(
+ const std::shared_ptr<DataType>& failed,
+ const std::shared_ptr<Field>& unexpected_field) const = 0;
+
+ protected:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(PromotionGraph);
+ PromotionGraph() = default;
+};
+
+ARROW_EXPORT const PromotionGraph* GetPromotionGraph();
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc
new file mode 100644
index 00000000000..c857cd537e7
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/object_parser.h"
+#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep
+
+#include <rapidjson/document.h>
+
+namespace arrow {
+namespace json {
+namespace internal {
+
+namespace rj = arrow::rapidjson;
+
+class ObjectParser::Impl {
+ public:
+ Status Parse(arrow::util::string_view json) {
+ document_.Parse(reinterpret_cast<const rj::Document::Ch*>(json.data()),
+ static_cast<size_t>(json.size()));
+
+ if (document_.HasParseError()) {
+ return Status::Invalid("Json parse error (offset ", document_.GetErrorOffset(),
+ "): ", document_.GetParseError());
+ }
+ if (!document_.IsObject()) {
+ return Status::TypeError("Not a json object");
+ }
+ return Status::OK();
+ }
+
+ Result<std::string> GetString(const char* key) const {
+ if (!document_.HasMember(key)) {
+ return Status::KeyError("Key '", key, "' does not exist");
+ }
+ if (!document_[key].IsString()) {
+ return Status::TypeError("Key '", key, "' is not a string");
+ }
+ return document_[key].GetString();
+ }
+
+ Result<bool> GetBool(const char* key) const {
+ if (!document_.HasMember(key)) {
+ return Status::KeyError("Key '", key, "' does not exist");
+ }
+ if (!document_[key].IsBool()) {
+ return Status::TypeError("Key '", key, "' is not a boolean");
+ }
+ return document_[key].GetBool();
+ }
+
+ private:
+ rj::Document document_;
+};
+
+ObjectParser::ObjectParser() : impl_(new ObjectParser::Impl()) {}
+
+ObjectParser::~ObjectParser() = default;
+
+Status ObjectParser::Parse(arrow::util::string_view json) { return impl_->Parse(json); }
+
+Result<std::string> ObjectParser::GetString(const char* key) const {
+ return impl_->GetString(key);
+}
+
+Result<bool> ObjectParser::GetBool(const char* key) const { return impl_->GetBool(key); }
+
+} // namespace internal
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h
new file mode 100644
index 00000000000..ef93201651a
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/result.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+namespace internal {
+
+/// This class is a helper to parse a json object from a string.
+/// It uses rapidjson::Document in implementation.
+class ARROW_EXPORT ObjectParser {
+ public:
+ ObjectParser();
+ ~ObjectParser();
+
+ Status Parse(arrow::util::string_view json);
+
+ Result<std::string> GetString(const char* key) const;
+ Result<bool> GetBool(const char* key) const;
+
+ private:
+ class Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace internal
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc
new file mode 100644
index 00000000000..06d09f81e94
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/object_writer.h"
+#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep
+
+#include <rapidjson/document.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+namespace rj = arrow::rapidjson;
+
+namespace arrow {
+namespace json {
+namespace internal {
+
+class ObjectWriter::Impl {
+ public:
+ Impl() : root_(rj::kObjectType) {}
+
+ void SetString(arrow::util::string_view key, arrow::util::string_view value) {
+ rj::Document::AllocatorType& allocator = document_.GetAllocator();
+
+ rj::Value str_key(key.data(), allocator);
+ rj::Value str_value(value.data(), allocator);
+
+ root_.AddMember(str_key, str_value, allocator);
+ }
+
+ void SetBool(arrow::util::string_view key, bool value) {
+ rj::Document::AllocatorType& allocator = document_.GetAllocator();
+
+ rj::Value str_key(key.data(), allocator);
+
+ root_.AddMember(str_key, value, allocator);
+ }
+
+ std::string Serialize() {
+ rj::StringBuffer buffer;
+ rj::Writer<rj::StringBuffer> writer(buffer);
+ root_.Accept(writer);
+
+ return buffer.GetString();
+ }
+
+ private:
+ rj::Document document_;
+ rj::Value root_;
+};
+
+ObjectWriter::ObjectWriter() : impl_(new ObjectWriter::Impl()) {}
+
+ObjectWriter::~ObjectWriter() = default;
+
+void ObjectWriter::SetString(arrow::util::string_view key,
+ arrow::util::string_view value) {
+ impl_->SetString(key, value);
+}
+
+void ObjectWriter::SetBool(arrow::util::string_view key, bool value) {
+ impl_->SetBool(key, value);
+}
+
+std::string ObjectWriter::Serialize() { return impl_->Serialize(); }
+
+} // namespace internal
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h
new file mode 100644
index 00000000000..55ff0ce52bc
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+namespace internal {
+
+/// This class is a helper to serialize a json object to a string.
+/// It uses rapidjson in implementation.
+class ARROW_EXPORT ObjectWriter {
+ public:
+ ObjectWriter();
+ ~ObjectWriter();
+
+ void SetString(arrow::util::string_view key, arrow::util::string_view value);
+ void SetBool(arrow::util::string_view key, bool value);
+
+ std::string Serialize();
+
+ private:
+ class Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace internal
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc
new file mode 100644
index 00000000000..dc5e628b1f3
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/options.h"
+
+namespace arrow {
+namespace json {
+
+ParseOptions ParseOptions::Defaults() { return ParseOptions(); }
+
+ReadOptions ReadOptions::Defaults() { return ReadOptions(); }
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h
new file mode 100644
index 00000000000..d7edab9cedd
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/json/type_fwd.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class DataType;
+class Schema;
+
+namespace json {
+
+enum class UnexpectedFieldBehavior : char {
+ /// Unexpected JSON fields are ignored
+ Ignore,
+ /// Unexpected JSON fields error out
+ Error,
+ /// Unexpected JSON fields are type-inferred and included in the output
+ InferType
+};
+
+struct ARROW_EXPORT ParseOptions {
+ // Parsing options
+
+ /// Optional explicit schema (disables type inference on those fields)
+ std::shared_ptr<Schema> explicit_schema;
+
+ /// Whether objects may be printed across multiple lines (for example pretty-printed)
+ ///
+ /// If true, parsing may be slower.
+ bool newlines_in_values = false;
+
+ /// How JSON fields outside of explicit_schema (if given) are treated
+ UnexpectedFieldBehavior unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+
+ /// Create parsing options with default values
+ static ParseOptions Defaults();
+};
+
+struct ARROW_EXPORT ReadOptions {
+ // Reader options
+
+ /// Whether to use the global CPU thread pool
+ bool use_threads = true;
+ /// Block size we request from the IO layer; also determines the size of
+ /// chunks when use_threads is true
+ int32_t block_size = 1 << 20; // 1 MB
+
+ /// Create read options with default values
+ static ReadOptions Defaults();
+};
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc
new file mode 100644
index 00000000000..05f155645a6
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc
@@ -0,0 +1,1099 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/parser.h"
+
+#include <functional>
+#include <limits>
+#include <tuple>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "arrow/json/rapidjson_defs.h"
+#include "rapidjson/error/en.h"
+#include "rapidjson/reader.h"
+
+#include "arrow/array.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/buffer_builder.h"
+#include "arrow/type.h"
+#include "arrow/util/bitset_stack.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/trie.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitsetStack;
+using internal::checked_cast;
+using internal::make_unique;
+using util::string_view;
+
+namespace json {
+
+namespace rj = arrow::rapidjson;
+
+template <typename... T>
+static Status ParseError(T&&... t) {
+ return Status::Invalid("JSON parse error: ", std::forward<T>(t)...);
+}
+
+const std::string& Kind::Name(Kind::type kind) {
+ static const std::string names[] = {"null", "boolean", "number",
+ "string", "array", "object"};
+
+ return names[kind];
+}
+
+const std::shared_ptr<const KeyValueMetadata>& Kind::Tag(Kind::type kind) {
+ static const std::shared_ptr<const KeyValueMetadata> tags[] = {
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kNull)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kBoolean)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kNumber)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kString)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kArray)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kObject)}}),
+ };
+ return tags[kind];
+}
+
+static arrow::internal::Trie MakeFromTagTrie() {
+ arrow::internal::TrieBuilder builder;
+ for (auto kind : {Kind::kNull, Kind::kBoolean, Kind::kNumber, Kind::kString,
+ Kind::kArray, Kind::kObject}) {
+ DCHECK_OK(builder.Append(Kind::Name(kind)));
+ }
+ auto name_to_kind = builder.Finish();
+ DCHECK_OK(name_to_kind.Validate());
+ return name_to_kind;
+}
+
+Kind::type Kind::FromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
+ static arrow::internal::Trie name_to_kind = MakeFromTagTrie();
+ DCHECK_NE(tag->FindKey("json_kind"), -1);
+ util::string_view name = tag->value(tag->FindKey("json_kind"));
+ DCHECK_NE(name_to_kind.Find(name), -1);
+ return static_cast<Kind::type>(name_to_kind.Find(name));
+}
+
+Status Kind::ForType(const DataType& type, Kind::type* kind) {
+ struct {
+ Status Visit(const NullType&) { return SetKind(Kind::kNull); }
+ Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
+ Status Visit(const NumberType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
+ Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
+ Status Visit(const DictionaryType& dict_type) {
+ return Kind::ForType(*dict_type.value_type(), kind_);
+ }
+ Status Visit(const ListType&) { return SetKind(Kind::kArray); }
+ Status Visit(const StructType&) { return SetKind(Kind::kObject); }
+ Status Visit(const DataType& not_impl) {
+ return Status::NotImplemented("JSON parsing of ", not_impl);
+ }
+ Status SetKind(Kind::type kind) {
+ *kind_ = kind;
+ return Status::OK();
+ }
+ Kind::type* kind_;
+ } visitor = {kind};
+ return VisitTypeInline(type, &visitor);
+}
+
+/// \brief ArrayBuilder for parsed but unconverted arrays
+template <Kind::type>
+class RawArrayBuilder;
+
+/// \brief packed pointer to a RawArrayBuilder
+///
+/// RawArrayBuilders are stored in HandlerBase,
+/// which allows storage of their indices (uint32_t) instead of a full pointer.
+/// BuilderPtr is also tagged with the json kind and nullable properties
+/// so those can be accessed before dereferencing the builder.
+struct BuilderPtr {
+ BuilderPtr() : BuilderPtr(BuilderPtr::null) {}
+ BuilderPtr(Kind::type k, uint32_t i, bool n) : index(i), kind(k), nullable(n) {}
+
+ BuilderPtr(const BuilderPtr&) = default;
+ BuilderPtr& operator=(const BuilderPtr&) = default;
+ BuilderPtr(BuilderPtr&&) = default;
+ BuilderPtr& operator=(BuilderPtr&&) = default;
+
+ // index of builder in its arena
+ // OR the length of that builder if kind == Kind::kNull
+ // (we don't allocate an arena for nulls since they're trivial)
+ uint32_t index;
+ Kind::type kind;
+ bool nullable;
+
+ bool operator==(BuilderPtr other) const {
+ return kind == other.kind && index == other.index;
+ }
+
+ bool operator!=(BuilderPtr other) const { return !(other == *this); }
+
+ operator bool() const { return *this != null; }
+
+ bool operator!() const { return *this == null; }
+
+ // The static BuilderPtr for null type data
+ static const BuilderPtr null;
+};
+
+const BuilderPtr BuilderPtr::null(Kind::kNull, 0, true);
+
+template <>
+class RawArrayBuilder<Kind::kBoolean> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool)
+ : data_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(bool value) {
+ RETURN_NOT_OK(data_builder_.Append(value));
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(data_builder_.Append(false));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(data_builder_.Append(count, false));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ Status Finish(std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> data, null_bitmap;
+ RETURN_NOT_OK(data_builder_.Finish(&data));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ *out = MakeArray(ArrayData::Make(boolean(), size, {null_bitmap, data}, null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ TypedBufferBuilder<bool> data_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+/// \brief builder for strings or unconverted numbers
+///
+/// Both of these are represented in the builder as an index only;
+/// the actual characters are stored in a single StringArray (into which
+/// an index refers). This means building is faster since we don't do
+/// allocation for string/number characters but accessing is strided.
+///
+/// On completion the indices and the character storage are combined
+/// into a dictionary-encoded array, which is a convenient container
+/// for indices referring into another array.
+class ScalarBuilder {
+ public:
+ explicit ScalarBuilder(MemoryPool* pool)
+ : values_length_(0), data_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(int32_t index, int32_t value_length) {
+ RETURN_NOT_OK(data_builder_.Append(index));
+ values_length_ += value_length;
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(data_builder_.Append(0));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(data_builder_.Append(count, 0));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ Status Finish(std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> data, null_bitmap;
+ RETURN_NOT_OK(data_builder_.Finish(&data));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ *out = MakeArray(ArrayData::Make(int32(), size, {null_bitmap, data}, null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ int32_t values_length() { return values_length_; }
+
+ private:
+ int32_t values_length_;
+ TypedBufferBuilder<int32_t> data_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kNumber> : public ScalarBuilder {
+ public:
+ using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kString> : public ScalarBuilder {
+ public:
+ using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kArray> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool)
+ : offset_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(int32_t child_length) {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ offset_ += child_length;
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(offset_builder_.Append(count, offset_));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ Status Finish(std::function<Status(BuilderPtr, std::shared_ptr<Array>*)> finish_child,
+ std::shared_ptr<Array>* out) {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> offsets, null_bitmap;
+ RETURN_NOT_OK(offset_builder_.Finish(&offsets));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ std::shared_ptr<Array> values;
+ RETURN_NOT_OK(finish_child(value_builder_, &values));
+ auto type = list(field("item", values->type(), value_builder_.nullable,
+ Kind::Tag(value_builder_.kind)));
+ *out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()},
+ null_count));
+ return Status::OK();
+ }
+
+ BuilderPtr value_builder() const { return value_builder_; }
+
+ void value_builder(BuilderPtr builder) { value_builder_ = builder; }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ BuilderPtr value_builder_ = BuilderPtr::null;
+ int32_t offset_ = 0;
+ TypedBufferBuilder<int32_t> offset_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kObject> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool) : null_bitmap_builder_(pool) {}
+
+ Status Append() { return null_bitmap_builder_.Append(true); }
+
+ Status AppendNull() { return null_bitmap_builder_.Append(false); }
+
+ Status AppendNull(int64_t count) { return null_bitmap_builder_.Append(count, false); }
+
+ std::string FieldName(int i) const {
+ for (const auto& name_index : name_to_index_) {
+ if (name_index.second == i) {
+ return name_index.first;
+ }
+ }
+ return "";
+ }
+
+ int GetFieldIndex(const std::string& name) const {
+ auto it = name_to_index_.find(name);
+ if (it == name_to_index_.end()) {
+ return -1;
+ }
+ return it->second;
+ }
+
+ int AddField(std::string name, BuilderPtr builder) {
+ auto index = num_fields();
+ field_builders_.push_back(builder);
+ name_to_index_.emplace(std::move(name), index);
+ return index;
+ }
+
+ int num_fields() const { return static_cast<int>(field_builders_.size()); }
+
+ BuilderPtr field_builder(int index) const { return field_builders_[index]; }
+
+ void field_builder(int index, BuilderPtr builder) { field_builders_[index] = builder; }
+
+ Status Finish(std::function<Status(BuilderPtr, std::shared_ptr<Array>*)> finish_child,
+ std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> null_bitmap;
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+
+ std::vector<string_view> field_names(num_fields());
+ for (const auto& name_index : name_to_index_) {
+ field_names[name_index.second] = name_index.first;
+ }
+
+ std::vector<std::shared_ptr<Field>> fields(num_fields());
+ std::vector<std::shared_ptr<ArrayData>> child_data(num_fields());
+ for (int i = 0; i < num_fields(); ++i) {
+ std::shared_ptr<Array> field_values;
+ RETURN_NOT_OK(finish_child(field_builders_[i], &field_values));
+ child_data[i] = field_values->data();
+ fields[i] = field(std::string(field_names[i]), field_values->type(),
+ field_builders_[i].nullable, Kind::Tag(field_builders_[i].kind));
+ }
+
+ *out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap},
+ std::move(child_data), null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ std::vector<BuilderPtr> field_builders_;
+ std::unordered_map<std::string, int> name_to_index_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+class RawBuilderSet {
+ public:
+ explicit RawBuilderSet(MemoryPool* pool) : pool_(pool) {}
+
+ /// Retrieve a pointer to a builder from a BuilderPtr
+ template <Kind::type kind>
+ enable_if_t<kind != Kind::kNull, RawArrayBuilder<kind>*> Cast(BuilderPtr builder) {
+ DCHECK_EQ(builder.kind, kind);
+ return arena<kind>().data() + builder.index;
+ }
+
+ /// construct a builder of statically defined kind
+ template <Kind::type kind>
+ Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) {
+ builder->index = static_cast<uint32_t>(arena<kind>().size());
+ builder->kind = kind;
+ builder->nullable = true;
+ arena<kind>().emplace_back(RawArrayBuilder<kind>(pool_));
+ return Cast<kind>(*builder)->AppendNull(leading_nulls);
+ }
+
+ /// construct a builder of whatever kind corresponds to a DataType
+ Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) {
+ Kind::type kind;
+ RETURN_NOT_OK(Kind::ForType(t, &kind));
+ switch (kind) {
+ case Kind::kNull:
+ *builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true);
+ return Status::OK();
+
+ case Kind::kBoolean:
+ return MakeBuilder<Kind::kBoolean>(leading_nulls, builder);
+
+ case Kind::kNumber:
+ return MakeBuilder<Kind::kNumber>(leading_nulls, builder);
+
+ case Kind::kString:
+ return MakeBuilder<Kind::kString>(leading_nulls, builder);
+
+ case Kind::kArray: {
+ RETURN_NOT_OK(MakeBuilder<Kind::kArray>(leading_nulls, builder));
+ const auto& list_type = checked_cast<const ListType&>(t);
+
+ BuilderPtr value_builder;
+ RETURN_NOT_OK(MakeBuilder(*list_type.value_type(), 0, &value_builder));
+ value_builder.nullable = list_type.value_field()->nullable();
+
+ Cast<Kind::kArray>(*builder)->value_builder(value_builder);
+ return Status::OK();
+ }
+ case Kind::kObject: {
+ RETURN_NOT_OK(MakeBuilder<Kind::kObject>(leading_nulls, builder));
+ const auto& struct_type = checked_cast<const StructType&>(t);
+
+ for (const auto& f : struct_type.fields()) {
+ BuilderPtr field_builder;
+ RETURN_NOT_OK(MakeBuilder(*f->type(), leading_nulls, &field_builder));
+ field_builder.nullable = f->nullable();
+
+ Cast<Kind::kObject>(*builder)->AddField(f->name(), field_builder);
+ }
+ return Status::OK();
+ }
+ default:
+ return Status::NotImplemented("invalid builder type");
+ }
+ }
+
+ /// Appending null is slightly tricky since null count is stored inline
+ /// for builders of Kind::kNull. Append nulls using this helper
+ Status AppendNull(BuilderPtr parent, int field_index, BuilderPtr builder) {
+ if (ARROW_PREDICT_FALSE(!builder.nullable)) {
+ return ParseError("a required field was null");
+ }
+ switch (builder.kind) {
+ case Kind::kNull: {
+ DCHECK_EQ(builder, parent.kind == Kind::kArray
+ ? Cast<Kind::kArray>(parent)->value_builder()
+ : Cast<Kind::kObject>(parent)->field_builder(field_index));
+
+ // increment null count stored inline
+ builder.index += 1;
+
+ // update the parent, since changing builder doesn't affect parent
+ if (parent.kind == Kind::kArray) {
+ Cast<Kind::kArray>(parent)->value_builder(builder);
+ } else {
+ Cast<Kind::kObject>(parent)->field_builder(field_index, builder);
+ }
+ return Status::OK();
+ }
+ case Kind::kBoolean:
+ return Cast<Kind::kBoolean>(builder)->AppendNull();
+
+ case Kind::kNumber:
+ return Cast<Kind::kNumber>(builder)->AppendNull();
+
+ case Kind::kString:
+ return Cast<Kind::kString>(builder)->AppendNull();
+
+ case Kind::kArray:
+ return Cast<Kind::kArray>(builder)->AppendNull();
+
+ case Kind::kObject: {
+ auto struct_builder = Cast<Kind::kObject>(builder);
+ RETURN_NOT_OK(struct_builder->AppendNull());
+
+ for (int i = 0; i < struct_builder->num_fields(); ++i) {
+ auto field_builder = struct_builder->field_builder(i);
+ RETURN_NOT_OK(AppendNull(builder, i, field_builder));
+ }
+ return Status::OK();
+ }
+ default:
+ return Status::NotImplemented("invalid builder Kind");
+ }
+ }
+
+ Status Finish(const std::shared_ptr<Array>& scalar_values, BuilderPtr builder,
+ std::shared_ptr<Array>* out) {
+ auto finish_children = [this, &scalar_values](BuilderPtr child,
+ std::shared_ptr<Array>* out) {
+ return Finish(scalar_values, child, out);
+ };
+ switch (builder.kind) {
+ case Kind::kNull: {
+ auto length = static_cast<int64_t>(builder.index);
+ *out = std::make_shared<NullArray>(length);
+ return Status::OK();
+ }
+ case Kind::kBoolean:
+ return Cast<Kind::kBoolean>(builder)->Finish(out);
+
+ case Kind::kNumber:
+ return FinishScalar(scalar_values, Cast<Kind::kNumber>(builder), out);
+
+ case Kind::kString:
+ return FinishScalar(scalar_values, Cast<Kind::kString>(builder), out);
+
+ case Kind::kArray:
+ return Cast<Kind::kArray>(builder)->Finish(std::move(finish_children), out);
+
+ case Kind::kObject:
+ return Cast<Kind::kObject>(builder)->Finish(std::move(finish_children), out);
+
+ default:
+ return Status::NotImplemented("invalid builder kind");
+ }
+ }
+
+ private:
+ /// finish a column of scalar values (string or number)
+ Status FinishScalar(const std::shared_ptr<Array>& scalar_values, ScalarBuilder* builder,
+ std::shared_ptr<Array>* out) {
+ std::shared_ptr<Array> indices;
+ // TODO(bkietz) embed builder->values_length() in this output somehow
+ RETURN_NOT_OK(builder->Finish(&indices));
+ auto ty = dictionary(int32(), scalar_values->type());
+ *out = std::make_shared<DictionaryArray>(ty, indices, scalar_values);
+ return Status::OK();
+ }
+
+ template <Kind::type kind>
+ std::vector<RawArrayBuilder<kind>>& arena() {
+ return std::get<static_cast<std::size_t>(kind)>(arenas_);
+ }
+
+ MemoryPool* pool_;
+ std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>,
+ std::vector<RawArrayBuilder<Kind::kNumber>>,
+ std::vector<RawArrayBuilder<Kind::kString>>,
+ std::vector<RawArrayBuilder<Kind::kArray>>,
+ std::vector<RawArrayBuilder<Kind::kObject>>>
+ arenas_;
+};
+
+/// Three implementations are provided for BlockParser, one for each
+/// UnexpectedFieldBehavior. However most of the logic is identical in each
+/// case, so the majority of the implementation is in this base class
+class HandlerBase : public BlockParser,
+ public rj::BaseReaderHandler<rj::UTF8<>, HandlerBase> {
+ public:
+ explicit HandlerBase(MemoryPool* pool)
+ : BlockParser(pool),
+ builder_set_(pool),
+ field_index_(-1),
+ scalar_values_builder_(pool) {}
+
+ /// Retrieve a pointer to a builder from a BuilderPtr
+ template <Kind::type kind>
+ enable_if_t<kind != Kind::kNull, RawArrayBuilder<kind>*> Cast(BuilderPtr builder) {
+ return builder_set_.Cast<kind>(builder);
+ }
+
+ /// Accessor for a stored error Status
+ Status Error() { return status_; }
+
+ /// \defgroup rapidjson-handler-interface functions expected by rj::Reader
+ ///
+ /// bool Key(const char* data, rj::SizeType size, ...) is omitted since
+ /// the behavior varies greatly between UnexpectedFieldBehaviors
+ ///
+ /// @{
+ bool Null() {
+ status_ = builder_set_.AppendNull(builder_stack_.back(), field_index_, builder_);
+ return status_.ok();
+ }
+
+ bool Bool(bool value) {
+ constexpr auto kind = Kind::kBoolean;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ status_ = IllegallyChangedTo(kind);
+ return status_.ok();
+ }
+ status_ = Cast<kind>(builder_)->Append(value);
+ return status_.ok();
+ }
+
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
+ status_ = AppendScalar<Kind::kNumber>(builder_, string_view(data, size));
+ return status_.ok();
+ }
+
+ bool String(const char* data, rj::SizeType size, ...) {
+ status_ = AppendScalar<Kind::kString>(builder_, string_view(data, size));
+ return status_.ok();
+ }
+
+ bool StartObject() {
+ status_ = StartObjectImpl();
+ return status_.ok();
+ }
+
+ bool EndObject(...) {
+ status_ = EndObjectImpl();
+ return status_.ok();
+ }
+
+ bool StartArray() {
+ status_ = StartArrayImpl();
+ return status_.ok();
+ }
+
+ bool EndArray(rj::SizeType size) {
+ status_ = EndArrayImpl(size);
+ return status_.ok();
+ }
+ /// @}
+
+ /// \brief Set up builders using an expected Schema
+ Status Initialize(const std::shared_ptr<Schema>& s) {
+ auto type = struct_({});
+ if (s) {
+ type = struct_(s->fields());
+ }
+ return builder_set_.MakeBuilder(*type, 0, &builder_);
+ }
+
+ Status Finish(std::shared_ptr<Array>* parsed) override {
+ std::shared_ptr<Array> scalar_values;
+ RETURN_NOT_OK(scalar_values_builder_.Finish(&scalar_values));
+ return builder_set_.Finish(scalar_values, builder_, parsed);
+ }
+
+ /// \brief Emit path of current field for debugging purposes
+ std::string Path() {
+ std::string path;
+ for (size_t i = 0; i < builder_stack_.size(); ++i) {
+ auto builder = builder_stack_[i];
+ if (builder.kind == Kind::kArray) {
+ path += "/[]";
+ } else {
+ auto struct_builder = Cast<Kind::kObject>(builder);
+ auto field_index = field_index_;
+ if (i + 1 < field_index_stack_.size()) {
+ field_index = field_index_stack_[i + 1];
+ }
+ path += "/" + struct_builder->FieldName(field_index);
+ }
+ }
+ return path;
+ }
+
+ protected:
+ template <typename Handler, typename Stream>
+ Status DoParse(Handler& handler, Stream&& json) {
+ constexpr auto parse_flags = rj::kParseIterativeFlag | rj::kParseNanAndInfFlag |
+ rj::kParseStopWhenDoneFlag |
+ rj::kParseNumbersAsStringsFlag;
+
+ rj::Reader reader;
+
+ for (; num_rows_ < kMaxParserNumRows; ++num_rows_) {
+ auto ok = reader.Parse<parse_flags>(json, handler);
+ switch (ok.Code()) {
+ case rj::kParseErrorNone:
+ // parse the next object
+ continue;
+ case rj::kParseErrorDocumentEmpty:
+ // parsed all objects, finish
+ return Status::OK();
+ case rj::kParseErrorTermination:
+ // handler emitted an error
+ return handler.Error();
+ default:
+ // rj emitted an error
+ return ParseError(rj::GetParseError_En(ok.Code()), " in row ", num_rows_);
+ }
+ }
+ return Status::Invalid("Exceeded maximum rows");
+ }
+
+ template <typename Handler>
+ Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
+ RETURN_NOT_OK(ReserveScalarStorage(json->size()));
+ rj::MemoryStream ms(reinterpret_cast<const char*>(json->data()), json->size());
+ using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
+ return DoParse(handler, InputStream(ms));
+ }
+
+ /// \defgroup handlerbase-append-methods append non-nested values
+ ///
+ /// @{
+
+ template <Kind::type kind>
+ Status AppendScalar(BuilderPtr builder, string_view scalar) {
+ if (ARROW_PREDICT_FALSE(builder.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ auto index = static_cast<int32_t>(scalar_values_builder_.length());
+ auto value_length = static_cast<int32_t>(scalar.size());
+ RETURN_NOT_OK(Cast<kind>(builder)->Append(index, value_length));
+ RETURN_NOT_OK(scalar_values_builder_.Reserve(1));
+ scalar_values_builder_.UnsafeAppend(scalar);
+ return Status::OK();
+ }
+
+ /// @}
+
+ Status StartObjectImpl() {
+ constexpr auto kind = Kind::kObject;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ auto struct_builder = Cast<kind>(builder_);
+ absent_fields_stack_.Push(struct_builder->num_fields(), true);
+ StartNested();
+ return struct_builder->Append();
+ }
+
+ /// \brief helper for Key() functions
+ ///
+ /// sets the field builder with name key, or returns false if
+ /// there is no field with that name
+ bool SetFieldBuilder(string_view key, bool* duplicate_keys) {
+ auto parent = Cast<Kind::kObject>(builder_stack_.back());
+ field_index_ = parent->GetFieldIndex(std::string(key));
+ if (ARROW_PREDICT_FALSE(field_index_ == -1)) {
+ return false;
+ }
+ *duplicate_keys = !absent_fields_stack_[field_index_];
+ if (*duplicate_keys) {
+ status_ = ParseError("Column(", Path(), ") was specified twice in row ", num_rows_);
+ return false;
+ }
+ builder_ = parent->field_builder(field_index_);
+ absent_fields_stack_[field_index_] = false;
+ return true;
+ }
+
+ Status EndObjectImpl() {
+ auto parent = builder_stack_.back();
+
+ auto expected_count = absent_fields_stack_.TopSize();
+ for (int i = 0; i < expected_count; ++i) {
+ if (!absent_fields_stack_[i]) {
+ continue;
+ }
+ auto field_builder = Cast<Kind::kObject>(parent)->field_builder(i);
+ if (ARROW_PREDICT_FALSE(!field_builder.nullable)) {
+ return ParseError("a required field was absent");
+ }
+ RETURN_NOT_OK(builder_set_.AppendNull(parent, i, field_builder));
+ }
+ absent_fields_stack_.Pop();
+ EndNested();
+ return Status::OK();
+ }
+
+ Status StartArrayImpl() {
+ constexpr auto kind = Kind::kArray;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ StartNested();
+ // append to the list builder in EndArrayImpl
+ builder_ = Cast<kind>(builder_)->value_builder();
+ return Status::OK();
+ }
+
+ Status EndArrayImpl(rj::SizeType size) {
+ EndNested();
+ // append to list_builder here
+ auto list_builder = Cast<Kind::kArray>(builder_);
+ return list_builder->Append(size);
+ }
+
+ /// helper method for StartArray and StartObject
+ /// adds the current builder to a stack so its
+ /// children can be visited and parsed.
+ void StartNested() {
+ field_index_stack_.push_back(field_index_);
+ field_index_ = -1;
+ builder_stack_.push_back(builder_);
+ }
+
+ /// helper method for EndArray and EndObject
+ /// replaces the current builder with its parent
+ /// so parsing of the parent can continue
+ void EndNested() {
+ field_index_ = field_index_stack_.back();
+ field_index_stack_.pop_back();
+ builder_ = builder_stack_.back();
+ builder_stack_.pop_back();
+ }
+
+ Status IllegallyChangedTo(Kind::type illegally_changed_to) {
+ return ParseError("Column(", Path(), ") changed from ", Kind::Name(builder_.kind),
+ " to ", Kind::Name(illegally_changed_to), " in row ", num_rows_);
+ }
+
+ /// Reserve storage for scalars, these can occupy almost all of the JSON buffer
+ Status ReserveScalarStorage(int64_t size) override {
+ auto available_storage = scalar_values_builder_.value_data_capacity() -
+ scalar_values_builder_.value_data_length();
+ if (size <= available_storage) {
+ return Status::OK();
+ }
+ return scalar_values_builder_.ReserveData(size - available_storage);
+ }
+
+ Status status_;
+ RawBuilderSet builder_set_;
+ BuilderPtr builder_;
+ // top of this stack is the parent of builder_
+ std::vector<BuilderPtr> builder_stack_;
+ // top of this stack refers to the fields of the highest *StructBuilder*
+ // in builder_stack_ (list builders don't have absent fields)
+ BitsetStack absent_fields_stack_;
+ // index of builder_ within its parent
+ int field_index_;
+ // top of this stack == field_index_
+ std::vector<int> field_index_stack_;
+ StringBuilder scalar_values_builder_;
+};
+
+template <UnexpectedFieldBehavior>
+class Handler;
+
+template <>
+class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
+ public:
+ using HandlerBase::HandlerBase;
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// if an unexpected field is encountered, emit a parse error and bail
+ bool Key(const char* key, rj::SizeType len, ...) {
+ bool duplicate_keys = false;
+ if (ARROW_PREDICT_FALSE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) {
+ return true;
+ }
+ if (!duplicate_keys) {
+ status_ = ParseError("unexpected field");
+ }
+ return false;
+ }
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
+ public:
+ using HandlerBase::HandlerBase;
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ bool Null() {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::Null();
+ }
+
+ bool Bool(bool value) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::Bool(value);
+ }
+
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::RawNumber(data, size);
+ }
+
+ bool String(const char* data, rj::SizeType size, ...) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::String(data, size);
+ }
+
+ bool StartObject() {
+ ++depth_;
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::StartObject();
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// if an unexpected field is encountered, skip until its value has been consumed
+ bool Key(const char* key, rj::SizeType len, ...) {
+ MaybeStopSkipping();
+ if (Skipping()) {
+ return true;
+ }
+ bool duplicate_keys = false;
+ if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) {
+ return true;
+ }
+ if (ARROW_PREDICT_FALSE(duplicate_keys)) {
+ return false;
+ }
+ skip_depth_ = depth_;
+ return true;
+ }
+
+ bool EndObject(...) {
+ MaybeStopSkipping();
+ --depth_;
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::EndObject();
+ }
+
+ bool StartArray() {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::StartArray();
+ }
+
+ bool EndArray(rj::SizeType size) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::EndArray(size);
+ }
+
+ private:
+ bool Skipping() { return depth_ >= skip_depth_; }
+
+ void MaybeStopSkipping() {
+ if (skip_depth_ == depth_) {
+ skip_depth_ = std::numeric_limits<int>::max();
+ }
+ }
+
+ int depth_ = 0;
+ int skip_depth_ = std::numeric_limits<int>::max();
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
+ public:
+ using HandlerBase::HandlerBase;
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ bool Bool(bool value) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kBoolean>())) {
+ return false;
+ }
+ return HandlerBase::Bool(value);
+ }
+
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) {
+ return false;
+ }
+ return HandlerBase::RawNumber(data, size);
+ }
+
+ bool String(const char* data, rj::SizeType size, ...) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) {
+ return false;
+ }
+ return HandlerBase::String(data, size);
+ }
+
+ bool StartObject() {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kObject>())) {
+ return false;
+ }
+ return HandlerBase::StartObject();
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// If an unexpected field is encountered, add a new builder to
+ /// the current parent builder. It is added as a NullBuilder with
+ /// (parent.length - 1) leading nulls. The next value parsed
+ /// will probably trigger promotion of this field from null
+ bool Key(const char* key, rj::SizeType len, ...) {
+ bool duplicate_keys = false;
+ if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) {
+ return true;
+ }
+ if (ARROW_PREDICT_FALSE(duplicate_keys)) {
+ return false;
+ }
+ auto struct_builder = Cast<Kind::kObject>(builder_stack_.back());
+ auto leading_nulls = static_cast<uint32_t>(struct_builder->length() - 1);
+ builder_ = BuilderPtr(Kind::kNull, leading_nulls, true);
+ field_index_ = struct_builder->AddField(std::string(key, len), builder_);
+ return true;
+ }
+
+ bool StartArray() {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kArray>())) {
+ return false;
+ }
+ return HandlerBase::StartArray();
+ }
+
+ private:
+ // return true if a terminal error was encountered
+ template <Kind::type kind>
+ bool MaybePromoteFromNull() {
+ if (ARROW_PREDICT_TRUE(builder_.kind != Kind::kNull)) {
+ return false;
+ }
+ auto parent = builder_stack_.back();
+ if (parent.kind == Kind::kArray) {
+ auto list_builder = Cast<Kind::kArray>(parent);
+ DCHECK_EQ(list_builder->value_builder(), builder_);
+ status_ = builder_set_.MakeBuilder<kind>(builder_.index, &builder_);
+ if (ARROW_PREDICT_FALSE(!status_.ok())) {
+ return true;
+ }
+ list_builder = Cast<Kind::kArray>(parent);
+ list_builder->value_builder(builder_);
+ } else {
+ auto struct_builder = Cast<Kind::kObject>(parent);
+ DCHECK_EQ(struct_builder->field_builder(field_index_), builder_);
+ status_ = builder_set_.MakeBuilder<kind>(builder_.index, &builder_);
+ if (ARROW_PREDICT_FALSE(!status_.ok())) {
+ return true;
+ }
+ struct_builder = Cast<Kind::kObject>(parent);
+ struct_builder->field_builder(field_index_, builder_);
+ }
+ return false;
+ }
+};
+
+Status BlockParser::Make(MemoryPool* pool, const ParseOptions& options,
+ std::unique_ptr<BlockParser>* out) {
+ DCHECK(options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
+ options.explicit_schema != nullptr);
+
+ switch (options.unexpected_field_behavior) {
+ case UnexpectedFieldBehavior::Ignore: {
+ *out = make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(pool);
+ break;
+ }
+ case UnexpectedFieldBehavior::Error: {
+ *out = make_unique<Handler<UnexpectedFieldBehavior::Error>>(pool);
+ break;
+ }
+ case UnexpectedFieldBehavior::InferType:
+ *out = make_unique<Handler<UnexpectedFieldBehavior::InferType>>(pool);
+ break;
+ }
+ return static_cast<HandlerBase&>(**out).Initialize(options.explicit_schema);
+}
+
+Status BlockParser::Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out) {
+ return BlockParser::Make(default_memory_pool(), options, out);
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h
new file mode 100644
index 00000000000..4dd14e4b80c
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/json/options.h"
+#include "arrow/status.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+class MemoryPool;
+class KeyValueMetadata;
+class ResizableBuffer;
+
+namespace json {
+
+struct Kind {
+ enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject };
+
+ static const std::string& Name(Kind::type);
+
+ static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type);
+
+ static Kind::type FromTag(const std::shared_ptr<const KeyValueMetadata>& tag);
+
+ static Status ForType(const DataType& type, Kind::type* kind);
+};
+
+constexpr int32_t kMaxParserNumRows = 100000;
+
+/// \class BlockParser
+/// \brief A reusable block-based parser for JSON data
+///
+/// The parser takes a block of newline delimited JSON data and extracts Arrays
+/// of unconverted strings which can be fed to a Converter to obtain a usable Array.
+///
+/// Note that in addition to parse errors (such as malformed JSON) some conversion
+/// errors are caught at parse time:
+/// - A null value in non-nullable column
+/// - Change in the JSON kind of a column. For example, if an explicit schema is provided
+/// which stipulates that field "a" is integral, a row of {"a": "not a number"} will
+/// result in an error. This also applies to fields outside an explicit schema.
+class ARROW_EXPORT BlockParser {
+ public:
+ virtual ~BlockParser() = default;
+
+ /// \brief Reserve storage for scalars parsed from a block of json
+ virtual Status ReserveScalarStorage(int64_t nbytes) = 0;
+
+ /// \brief Parse a block of data
+ virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
+
+ /// \brief Extract parsed data
+ virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
+
+ /// \brief Return the number of parsed rows
+ int32_t num_rows() const { return num_rows_; }
+
+ /// \brief Construct a BlockParser
+ ///
+ /// \param[in] pool MemoryPool to use when constructing parsed array
+ /// \param[in] options ParseOptions to use when parsing JSON
+ /// \param[out] out constructed BlockParser
+ static Status Make(MemoryPool* pool, const ParseOptions& options,
+ std::unique_ptr<BlockParser>* out);
+
+ static Status Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out);
+
+ protected:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
+
+ explicit BlockParser(MemoryPool* pool) : pool_(pool) {}
+
+ MemoryPool* pool_;
+ int32_t num_rows_ = 0;
+};
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h
new file mode 100644
index 00000000000..9ed81d000c5
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Include this file before including any RapidJSON headers.
+
+#pragma once
+
+#define RAPIDJSON_HAS_STDSTRING 1
+#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
+#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
+
+// rapidjson will be defined in namespace arrow::rapidjson
+#define RAPIDJSON_NAMESPACE arrow::rapidjson
+#define RAPIDJSON_NAMESPACE_BEGIN \
+ namespace arrow { \
+ namespace rapidjson {
+#define RAPIDJSON_NAMESPACE_END \
+ } \
+ }
+
+// enable SIMD whitespace skipping, if available
+#if defined(ARROW_HAVE_SSE4_2)
+#define RAPIDJSON_SSE2 1
+#define RAPIDJSON_SSE42 1
+#endif
+
+#if defined(ARROW_HAVE_NEON)
+#define RAPIDJSON_NEON 1
+#endif
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc
new file mode 100644
index 00000000000..51c77fa4df9
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/reader.h"
+
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/json/chunked_builder.h"
+#include "arrow/json/chunker.h"
+#include "arrow/json/converter.h"
+#include "arrow/json/parser.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/task_group.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using util::string_view;
+
+using internal::checked_cast;
+using internal::GetCpuThreadPool;
+using internal::TaskGroup;
+using internal::ThreadPool;
+
+namespace json {
+
+class TableReaderImpl : public TableReader,
+ public std::enable_shared_from_this<TableReaderImpl> {
+ public:
+ TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
+ const ParseOptions& parse_options,
+ std::shared_ptr<TaskGroup> task_group)
+ : pool_(pool),
+ read_options_(read_options),
+ parse_options_(parse_options),
+ chunker_(MakeChunker(parse_options_)),
+ task_group_(std::move(task_group)) {}
+
+ Status Init(std::shared_ptr<io::InputStream> input) {
+ ARROW_ASSIGN_OR_RAISE(auto it,
+ io::MakeInputStreamIterator(input, read_options_.block_size));
+ return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
+ .Value(&block_iterator_);
+ }
+
+ Result<std::shared_ptr<Table>> Read() override {
+ RETURN_NOT_OK(MakeBuilder());
+
+ ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
+ if (block == nullptr) {
+ return Status::Invalid("Empty JSON file");
+ }
+
+ auto self = shared_from_this();
+ auto empty = std::make_shared<Buffer>("");
+
+ int64_t block_index = 0;
+ std::shared_ptr<Buffer> partial = empty;
+
+ while (block != nullptr) {
+ std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+
+ ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+
+ if (next_block == nullptr) {
+ // End of file reached => compute completion from penultimate block
+ RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
+ } else {
+ std::shared_ptr<Buffer> starts_with_whole;
+ // Get completion of partial from previous block.
+ RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
+ &starts_with_whole));
+
+ // Get all whole objects entirely inside the current buffer
+ RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+ }
+
+ // Launch parse task
+ task_group_->Append([self, partial, completion, whole, block_index] {
+ return self->ParseAndInsert(partial, completion, whole, block_index);
+ });
+ block_index++;
+
+ partial = next_partial;
+ block = next_block;
+ }
+
+ std::shared_ptr<ChunkedArray> array;
+ RETURN_NOT_OK(builder_->Finish(&array));
+ return Table::FromChunkedStructArray(array);
+ }
+
+ private:
+ Status MakeBuilder() {
+ auto type = parse_options_.explicit_schema
+ ? struct_(parse_options_.explicit_schema->fields())
+ : struct_({});
+
+ auto promotion_graph =
+ parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+ ? GetPromotionGraph()
+ : nullptr;
+
+ return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
+ }
+
+ Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
+ const std::shared_ptr<Buffer>& completion,
+ const std::shared_ptr<Buffer>& whole, int64_t block_index) {
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
+ RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
+ whole->size()));
+
+ if (partial->size() != 0 || completion->size() != 0) {
+ std::shared_ptr<Buffer> straddling;
+ if (partial->size() == 0) {
+ straddling = completion;
+ } else if (completion->size() == 0) {
+ straddling = partial;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(straddling,
+ ConcatenateBuffers({partial, completion}, pool_));
+ }
+ RETURN_NOT_OK(parser->Parse(straddling));
+ }
+
+ if (whole->size() != 0) {
+ RETURN_NOT_OK(parser->Parse(whole));
+ }
+
+ std::shared_ptr<Array> parsed;
+ RETURN_NOT_OK(parser->Finish(&parsed));
+ builder_->Insert(block_index, field("", parsed->type()), parsed);
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ ReadOptions read_options_;
+ ParseOptions parse_options_;
+ std::unique_ptr<Chunker> chunker_;
+ std::shared_ptr<TaskGroup> task_group_;
+ Iterator<std::shared_ptr<Buffer>> block_iterator_;
+ std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
+
+Status TableReader::Read(std::shared_ptr<Table>* out) { return Read().Value(out); }
+
+Result<std::shared_ptr<TableReader>> TableReader::Make(
+ MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+ const ReadOptions& read_options, const ParseOptions& parse_options) {
+ std::shared_ptr<TableReaderImpl> ptr;
+ if (read_options.use_threads) {
+ ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+ TaskGroup::MakeThreaded(GetCpuThreadPool()));
+ } else {
+ ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+ TaskGroup::MakeSerial());
+ }
+ RETURN_NOT_OK(ptr->Init(input));
+ return ptr;
+}
+
+Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+ const ReadOptions& read_options,
+ const ParseOptions& parse_options,
+ std::shared_ptr<TableReader>* out) {
+ return TableReader::Make(pool, input, read_options, parse_options).Value(out);
+}
+
+Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
+ std::shared_ptr<Buffer> json) {
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(options, &parser));
+ RETURN_NOT_OK(parser->Parse(json));
+ std::shared_ptr<Array> parsed;
+ RETURN_NOT_OK(parser->Finish(&parsed));
+
+ auto type =
+ options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
+ auto promotion_graph =
+ options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+ ? GetPromotionGraph()
+ : nullptr;
+ std::shared_ptr<ChunkedArrayBuilder> builder;
+ RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(),
+ promotion_graph, type, &builder));
+
+ builder->Insert(0, field("", type), parsed);
+ std::shared_ptr<ChunkedArray> converted_chunked;
+ RETURN_NOT_OK(builder->Finish(&converted_chunked));
+ const auto& converted = checked_cast<const StructArray&>(*converted_chunked->chunk(0));
+
+ std::vector<std::shared_ptr<Array>> columns(converted.num_fields());
+ for (int i = 0; i < converted.num_fields(); ++i) {
+ columns[i] = converted.field(i);
+ }
+ return RecordBatch::Make(schema(converted.type()->fields()), converted.length(),
+ std::move(columns));
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h
new file mode 100644
index 00000000000..c40338c1e1c
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/json/options.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class MemoryPool;
+class Table;
+class RecordBatch;
+class Array;
+class DataType;
+
+namespace io {
+class InputStream;
+} // namespace io
+
+namespace json {
+
+/// A class that reads an entire JSON file into a Arrow Table
+///
+/// The file is expected to consist of individual line-separated JSON objects
+class ARROW_EXPORT TableReader {
+ public:
+ virtual ~TableReader() = default;
+
+ /// Read the entire JSON file and convert it to a Arrow Table
+ virtual Result<std::shared_ptr<Table>> Read() = 0;
+
+ ARROW_DEPRECATED("Use Result-returning version")
+ Status Read(std::shared_ptr<Table>* out);
+
+ /// Create a TableReader instance
+ static Result<std::shared_ptr<TableReader>> Make(MemoryPool* pool,
+ std::shared_ptr<io::InputStream> input,
+ const ReadOptions&,
+ const ParseOptions&);
+
+ ARROW_DEPRECATED("Use Result-returning version")
+ static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+ const ReadOptions&, const ParseOptions&,
+ std::shared_ptr<TableReader>* out);
+};
+
+ARROW_EXPORT Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
+ std::shared_ptr<Buffer> json);
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h
new file mode 100644
index 00000000000..67e2e1bb406
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace arrow {
+namespace json {
+
+class TableReader;
+struct ReadOptions;
+struct ParseOptions;
+
+} // namespace json
+} // namespace arrow
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h b/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h
new file mode 100644
index 00000000000..addded94943
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <bitset>
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/compare.h"
+#include "arrow/util/functional.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/string_builder.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/type_traits.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace internal {
+
+/// \brief Store a stack of bitsets efficiently. The top bitset may be
+/// accessed and its bits may be modified, but it may not be resized.
+class BitsetStack {
+ public:
+ using reference = typename std::vector<bool>::reference;
+
+ /// \brief push a bitset onto the stack
+ /// \param size number of bits in the next bitset
+ /// \param value initial value for bits in the pushed bitset
+ void Push(int size, bool value) {
+ offsets_.push_back(bit_count());
+ bits_.resize(bit_count() + size, value);
+ }
+
+ /// \brief number of bits in the bitset at the top of the stack
+ int TopSize() const {
+ if (offsets_.size() == 0) return 0;
+ return bit_count() - offsets_.back();
+ }
+
+ /// \brief pop a bitset off the stack
+ void Pop() {
+ bits_.resize(offsets_.back());
+ offsets_.pop_back();
+ }
+
+ /// \brief get the value of a bit in the top bitset
+ /// \param i index of the bit to access
+ bool operator[](int i) const { return bits_[offsets_.back() + i]; }
+
+ /// \brief get a mutable reference to a bit in the top bitset
+ /// \param i index of the bit to access
+ reference operator[](int i) { return bits_[offsets_.back() + i]; }
+
+ private:
+ int bit_count() const { return static_cast<int>(bits_.size()); }
+ std::vector<bool> bits_;
+ std::vector<int> offsets_;
+};
+
+} // namespace internal
+} // namespace arrow