diff options
author | heretic <[email protected]> | 2022-09-01 11:18:57 +0300 |
---|---|---|
committer | heretic <[email protected]> | 2022-09-01 11:18:57 +0300 |
commit | 8393683e8cb62468ccace14fa3379e3a4fbdde73 (patch) | |
tree | 4f2d32a77665019c9491d34dbe1cc5e605bb220c /contrib/libs/apache/arrow/cpp | |
parent | 836e587fc927c87149f8f0b2676d2587e6a79111 (diff) |
add apache arrow python
Diffstat (limited to 'contrib/libs/apache/arrow/cpp')
31 files changed, 7481 insertions, 0 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc new file mode 100644 index 00000000000..4f44e24ba6d --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.cc @@ -0,0 +1,761 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <sstream> +#include <utility> + +#include "arrow/util/config.h" + +#include "arrow/filesystem/filesystem.h" +#ifdef ARROW_HDFS +#error #include "arrow/filesystem/hdfs.h" +#endif +#ifdef ARROW_S3 +#error #include "arrow/filesystem/s3fs.h" +#endif +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/slow.h" +#include "arrow/io/util_internal.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/parallel.h" +#include "arrow/util/uri.h" +#include "arrow/util/vector.h" +#include "arrow/util/windows_fixup.h" + +namespace arrow { + +using internal::checked_pointer_cast; +using internal::TaskHints; +using internal::Uri; +using io::internal::SubmitIO; + +namespace fs { + +using internal::ConcatAbstractPath; +using internal::EnsureTrailingSlash; +using internal::GetAbstractPathParent; +using internal::kSep; +using internal::RemoveLeadingSlash; +using internal::RemoveTrailingSlash; +using internal::ToSlashes; + +std::string ToString(FileType ftype) { + switch (ftype) { + case FileType::NotFound: + return "not-found"; + case FileType::Unknown: + return "unknown"; + case FileType::File: + return "file"; + case FileType::Directory: + return "directory"; + default: + ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype); + return "???"; + } +} + +// For googletest +ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType ftype) { +#define FILE_TYPE_CASE(value_name) \ + case FileType::value_name: \ + os << "FileType::" ARROW_STRINGIFY(value_name); \ + break; + + switch (ftype) { + FILE_TYPE_CASE(NotFound) + FILE_TYPE_CASE(Unknown) + FILE_TYPE_CASE(File) + FILE_TYPE_CASE(Directory) + default: + ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype); + } + +#undef FILE_TYPE_CASE + return os; +} + +std::string FileInfo::base_name() const { + return internal::GetAbstractPathParent(path_).second; +} + +std::string FileInfo::dir_name() const { + return internal::GetAbstractPathParent(path_).first; +} + +// Debug helper +std::string FileInfo::ToString() const { + std::stringstream os; + os << *this; + return os.str(); +} + +std::ostream& operator<<(std::ostream& os, const FileInfo& info) { + return os << "FileInfo(" << info.type() << ", " << info.path() << ")"; +} + +std::string FileInfo::extension() const { + return internal::GetAbstractPathExtension(path_); +} + +////////////////////////////////////////////////////////////////////////// +// FileSystem default method implementations + +FileSystem::~FileSystem() {} + +Result<std::string> FileSystem::NormalizePath(std::string path) { return path; } + +Result<std::vector<FileInfo>> FileSystem::GetFileInfo( + const std::vector<std::string>& paths) { + std::vector<FileInfo> res; + res.reserve(paths.size()); + for (const auto& path : paths) { + ARROW_ASSIGN_OR_RAISE(FileInfo info, GetFileInfo(path)); + res.push_back(std::move(info)); + } + return res; +} + +namespace { + +template <typename DeferredFunc> +auto FileSystemDefer(FileSystem* fs, bool synchronous, DeferredFunc&& func) + -> decltype(DeferNotOk( + fs->io_context().executor()->Submit(func, std::shared_ptr<FileSystem>{}))) { + auto self = fs->shared_from_this(); + if (synchronous) { + return std::forward<DeferredFunc>(func)(std::move(self)); + } + return DeferNotOk(io::internal::SubmitIO( + fs->io_context(), std::forward<DeferredFunc>(func), std::move(self))); +} + +} // namespace + +Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync( + const std::vector<std::string>& paths) { + return FileSystemDefer( + this, default_async_is_sync_, + [paths](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(paths); }); +} + +FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) { + auto fut = FileSystemDefer( + this, default_async_is_sync_, + [select](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(select); }); + return MakeSingleFutureGenerator(std::move(fut)); +} + +Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) { + Status st = Status::OK(); + for (const auto& path : paths) { + st &= DeleteFile(path); + } + return st; +} + +namespace { + +Status ValidateInputFileInfo(const FileInfo& info) { + if (info.type() == FileType::NotFound) { + return internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return internal::NotAFile(info.path()); + } + return Status::OK(); +} + +} // namespace + +Result<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStream( + const FileInfo& info) { + RETURN_NOT_OK(ValidateInputFileInfo(info)); + return OpenInputStream(info.path()); +} + +Result<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFile( + const FileInfo& info) { + RETURN_NOT_OK(ValidateInputFileInfo(info)); + return OpenInputFile(info.path()); +} + +Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync( + const std::string& path) { + return FileSystemDefer( + this, default_async_is_sync_, + [path](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(path); }); +} + +Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync( + const FileInfo& info) { + RETURN_NOT_OK(ValidateInputFileInfo(info)); + return FileSystemDefer( + this, default_async_is_sync_, + [info](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(info); }); +} + +Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync( + const std::string& path) { + return FileSystemDefer( + this, default_async_is_sync_, + [path](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(path); }); +} + +Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync( + const FileInfo& info) { + RETURN_NOT_OK(ValidateInputFileInfo(info)); + return FileSystemDefer( + this, default_async_is_sync_, + [info](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(info); }); +} + +Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenOutputStream( + const std::string& path) { + return OpenOutputStream(path, std::shared_ptr<const KeyValueMetadata>{}); +} + +Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenAppendStream( + const std::string& path) { + return OpenAppendStream(path, std::shared_ptr<const KeyValueMetadata>{}); +} + +////////////////////////////////////////////////////////////////////////// +// SubTreeFileSystem implementation + +SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path, + std::shared_ptr<FileSystem> base_fs) + : FileSystem(base_fs->io_context()), + base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), + base_fs_(base_fs) {} + +SubTreeFileSystem::~SubTreeFileSystem() {} + +Result<std::string> SubTreeFileSystem::NormalizeBasePath( + std::string base_path, const std::shared_ptr<FileSystem>& base_fs) { + ARROW_ASSIGN_OR_RAISE(base_path, base_fs->NormalizePath(std::move(base_path))); + return EnsureTrailingSlash(std::move(base_path)); +} + +bool SubTreeFileSystem::Equals(const FileSystem& other) const { + if (this == &other) { + return true; + } + if (other.type_name() != type_name()) { + return false; + } + const auto& subfs = ::arrow::internal::checked_cast<const SubTreeFileSystem&>(other); + return base_path_ == subfs.base_path_ && base_fs_->Equals(subfs.base_fs_); +} + +std::string SubTreeFileSystem::PrependBase(const std::string& s) const { + if (s.empty()) { + return base_path_; + } else { + return ConcatAbstractPath(base_path_, s); + } +} + +Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const { + if (s->empty()) { + return Status::IOError("Empty path"); + } else { + *s = ConcatAbstractPath(base_path_, *s); + return Status::OK(); + } +} + +Result<std::string> SubTreeFileSystem::StripBase(const std::string& s) const { + auto len = base_path_.length(); + // Note base_path_ ends with a slash (if not empty) + if (s.length() >= len && s.substr(0, len) == base_path_) { + return s.substr(len); + } else { + return Status::UnknownError("Underlying filesystem returned path '", s, + "', which is not a subpath of '", base_path_, "'"); + } +} + +Status SubTreeFileSystem::FixInfo(FileInfo* info) const { + ARROW_ASSIGN_OR_RAISE(auto fixed_path, StripBase(info->path())); + info->set_path(std::move(fixed_path)); + return Status::OK(); +} + +Result<std::string> SubTreeFileSystem::NormalizePath(std::string path) { + ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(PrependBase(path))); + return StripBase(std::move(normalized)); +} + +Result<FileInfo> SubTreeFileSystem::GetFileInfo(const std::string& path) { + ARROW_ASSIGN_OR_RAISE(FileInfo info, base_fs_->GetFileInfo(PrependBase(path))); + RETURN_NOT_OK(FixInfo(&info)); + return info; +} + +Result<std::vector<FileInfo>> SubTreeFileSystem::GetFileInfo(const FileSelector& select) { + auto selector = select; + selector.base_dir = PrependBase(selector.base_dir); + ARROW_ASSIGN_OR_RAISE(auto infos, base_fs_->GetFileInfo(selector)); + for (auto& info : infos) { + RETURN_NOT_OK(FixInfo(&info)); + } + return infos; +} + +FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) { + auto selector = select; + selector.base_dir = PrependBase(selector.base_dir); + auto gen = base_fs_->GetFileInfoGenerator(selector); + + auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this()); + + std::function<Result<std::vector<FileInfo>>(const std::vector<FileInfo>& infos)> + fix_infos = [self](std::vector<FileInfo> infos) -> Result<std::vector<FileInfo>> { + for (auto& info : infos) { + RETURN_NOT_OK(self->FixInfo(&info)); + } + return infos; + }; + return MakeMappedGenerator(gen, fix_infos); +} + +Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->CreateDir(s, recursive); +} + +Status SubTreeFileSystem::DeleteDir(const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->DeleteDir(s); +} + +Status SubTreeFileSystem::DeleteDirContents(const std::string& path) { + if (internal::IsEmptyPath(path)) { + return internal::InvalidDeleteDirContents(path); + } + auto s = PrependBase(path); + return base_fs_->DeleteDirContents(s); +} + +Status SubTreeFileSystem::DeleteRootDirContents() { + if (base_path_.empty()) { + return base_fs_->DeleteRootDirContents(); + } else { + return base_fs_->DeleteDirContents(base_path_); + } +} + +Status SubTreeFileSystem::DeleteFile(const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->DeleteFile(s); +} + +Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) { + auto s = src; + auto d = dest; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + RETURN_NOT_OK(PrependBaseNonEmpty(&d)); + return base_fs_->Move(s, d); +} + +Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) { + auto s = src; + auto d = dest; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + RETURN_NOT_OK(PrependBaseNonEmpty(&d)); + return base_fs_->CopyFile(s, d); +} + +Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream( + const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenInputStream(s); +} + +Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputStream(new_info); +} + +Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync( + const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenInputStreamAsync(s); +} + +Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputStreamAsync(new_info); +} + +Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile( + const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenInputFile(s); +} + +Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputFile(new_info); +} + +Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync( + const std::string& path) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenInputFileAsync(s); +} + +Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputFileAsync(new_info); +} + +Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenOutputStream(s, metadata); +} + +Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + auto s = path; + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + return base_fs_->OpenAppendStream(s, metadata); +} + +////////////////////////////////////////////////////////////////////////// +// SlowFileSystem implementation + +SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs, + std::shared_ptr<io::LatencyGenerator> latencies) + : FileSystem(base_fs->io_context()), base_fs_(base_fs), latencies_(latencies) {} + +SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs, + double average_latency) + : FileSystem(base_fs->io_context()), + base_fs_(base_fs), + latencies_(io::LatencyGenerator::Make(average_latency)) {} + +SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs, + double average_latency, int32_t seed) + : FileSystem(base_fs->io_context()), + base_fs_(base_fs), + latencies_(io::LatencyGenerator::Make(average_latency, seed)) {} + +bool SlowFileSystem::Equals(const FileSystem& other) const { return this == &other; } + +Result<FileInfo> SlowFileSystem::GetFileInfo(const std::string& path) { + latencies_->Sleep(); + return base_fs_->GetFileInfo(path); +} + +Result<std::vector<FileInfo>> SlowFileSystem::GetFileInfo(const FileSelector& selector) { + latencies_->Sleep(); + return base_fs_->GetFileInfo(selector); +} + +Status SlowFileSystem::CreateDir(const std::string& path, bool recursive) { + latencies_->Sleep(); + return base_fs_->CreateDir(path, recursive); +} + +Status SlowFileSystem::DeleteDir(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteDir(path); +} + +Status SlowFileSystem::DeleteDirContents(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteDirContents(path); +} + +Status SlowFileSystem::DeleteRootDirContents() { + latencies_->Sleep(); + return base_fs_->DeleteRootDirContents(); +} + +Status SlowFileSystem::DeleteFile(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteFile(path); +} + +Status SlowFileSystem::Move(const std::string& src, const std::string& dest) { + latencies_->Sleep(); + return base_fs_->Move(src, dest); +} + +Status SlowFileSystem::CopyFile(const std::string& src, const std::string& dest) { + latencies_->Sleep(); + return base_fs_->CopyFile(src, dest); +} + +Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream( + const std::string& path) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(path)); + return std::make_shared<io::SlowInputStream>(stream, latencies_); +} + +Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream( + const FileInfo& info) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(info)); + return std::make_shared<io::SlowInputStream>(stream, latencies_); +} + +Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile( + const std::string& path) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(path)); + return std::make_shared<io::SlowRandomAccessFile>(file, latencies_); +} + +Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile( + const FileInfo& info) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(info)); + return std::make_shared<io::SlowRandomAccessFile>(file, latencies_); +} + +Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + latencies_->Sleep(); + // XXX Should we have a SlowOutputStream that waits on Flush() and Close()? + return base_fs_->OpenOutputStream(path, metadata); +} + +Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + latencies_->Sleep(); + return base_fs_->OpenAppendStream(path, metadata); +} + +Status CopyFiles(const std::vector<FileLocator>& sources, + const std::vector<FileLocator>& destinations, + const io::IOContext& io_context, int64_t chunk_size, bool use_threads) { + if (sources.size() != destinations.size()) { + return Status::Invalid("Trying to copy ", sources.size(), " files into ", + destinations.size(), " paths."); + } + + auto copy_one_file = [&](int i) { + if (sources[i].filesystem->Equals(destinations[i].filesystem)) { + return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path); + } + + ARROW_ASSIGN_OR_RAISE(auto source, + sources[i].filesystem->OpenInputStream(sources[i].path)); + ARROW_ASSIGN_OR_RAISE(const auto metadata, source->ReadMetadata()); + + ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream( + destinations[i].path, metadata)); + RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context)); + return destination->Close(); + }; + + return ::arrow::internal::OptionalParallelFor( + use_threads, static_cast<int>(sources.size()), std::move(copy_one_file), + io_context.executor()); +} + +Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs, + const FileSelector& source_sel, + const std::shared_ptr<FileSystem>& destination_fs, + const std::string& destination_base_dir, const io::IOContext& io_context, + int64_t chunk_size, bool use_threads) { + ARROW_ASSIGN_OR_RAISE(auto source_infos, source_fs->GetFileInfo(source_sel)); + if (source_infos.empty()) { + return Status::OK(); + } + + std::vector<FileLocator> sources, destinations; + std::vector<std::string> dirs; + + for (const FileInfo& source_info : source_infos) { + auto relative = internal::RemoveAncestor(source_sel.base_dir, source_info.path()); + if (!relative.has_value()) { + return Status::Invalid("GetFileInfo() yielded path '", source_info.path(), + "', which is outside base dir '", source_sel.base_dir, "'"); + } + + auto destination_path = + internal::ConcatAbstractPath(destination_base_dir, relative->to_string()); + + if (source_info.IsDirectory()) { + dirs.push_back(destination_path); + } else if (source_info.IsFile()) { + sources.push_back({source_fs, source_info.path()}); + destinations.push_back({destination_fs, destination_path}); + } + } + + auto create_one_dir = [&](int i) { return destination_fs->CreateDir(dirs[i]); }; + + dirs = internal::MinimalCreateDirSet(std::move(dirs)); + RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( + use_threads, static_cast<int>(dirs.size()), std::move(create_one_dir), + io_context.executor())); + + return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); +} + +namespace { + +Result<Uri> ParseFileSystemUri(const std::string& uri_string) { + Uri uri; + auto status = uri.Parse(uri_string); + if (!status.ok()) { +#ifdef _WIN32 + // Could be a "file:..." URI with backslashes instead of regular slashes. + RETURN_NOT_OK(uri.Parse(ToSlashes(uri_string))); + if (uri.scheme() != "file") { + return status; + } +#else + return status; +#endif + } + return std::move(uri); +} + +Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri, + const std::string& uri_string, + const io::IOContext& io_context, + std::string* out_path) { + const auto scheme = uri.scheme(); + + if (scheme == "file") { + std::string path; + ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path)); + if (out_path != nullptr) { + *out_path = path; + } + return std::make_shared<LocalFileSystem>(options, io_context); + } + if (scheme == "hdfs" || scheme == "viewfs") { +#ifdef ARROW_HDFS + ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); + if (out_path != nullptr) { + *out_path = uri.path(); + } + ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context)); + return hdfs; +#else + return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support"); +#endif + } + if (scheme == "s3") { +#ifdef ARROW_S3 + RETURN_NOT_OK(EnsureS3Initialized()); + ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); + ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); + return s3fs; +#else + return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support"); +#endif + } + + if (scheme == "mock") { + // MockFileSystem does not have an absolute / relative path distinction, + // normalize path by removing leading slash. + if (out_path != nullptr) { + *out_path = std::string(RemoveLeadingSlash(uri.path())); + } + return std::make_shared<internal::MockFileSystem>(internal::CurrentTimePoint(), + io_context); + } + + return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string); +} + +} // namespace + +Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string, + std::string* out_path) { + return FileSystemFromUri(uri_string, io::default_io_context(), out_path); +} + +Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string, + const io::IOContext& io_context, + std::string* out_path) { + ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string)); + return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path); +} + +Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(const std::string& uri_string, + std::string* out_path) { + return FileSystemFromUriOrPath(uri_string, io::default_io_context(), out_path); +} + +Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath( + const std::string& uri_string, const io::IOContext& io_context, + std::string* out_path) { + if (internal::DetectAbsolutePath(uri_string)) { + // Normalize path separators + if (out_path != nullptr) { + *out_path = ToSlashes(uri_string); + } + return std::make_shared<LocalFileSystem>(); + } + return FileSystemFromUri(uri_string, io_context, out_path); +} + +Status FileSystemFromUri(const std::string& uri, std::shared_ptr<FileSystem>* out_fs, + std::string* out_path) { + return FileSystemFromUri(uri, out_path).Value(out_fs); +} + +Status Initialize(const FileSystemGlobalOptions& options) { + internal::global_options = options; + return Status::OK(); +} + +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h new file mode 100644 index 00000000000..c739471c725 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/filesystem.h @@ -0,0 +1,532 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <chrono> +#include <cstdint> +#include <functional> +#include <iosfwd> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/filesystem/type_fwd.h" +#include "arrow/io/interfaces.h" +#include "arrow/type_fwd.h" +#include "arrow/util/compare.h" +#include "arrow/util/macros.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" +#include "arrow/util/windows_fixup.h" + +namespace arrow { +namespace fs { + +// A system clock time point expressed as a 64-bit (or more) number of +// nanoseconds since the epoch. +using TimePoint = + std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>; + +ARROW_EXPORT std::string ToString(FileType); + +ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType); + +static const int64_t kNoSize = -1; +static const TimePoint kNoTime = TimePoint(TimePoint::duration(-1)); + +/// \brief FileSystem entry info +struct ARROW_EXPORT FileInfo : public util::EqualityComparable<FileInfo> { + FileInfo() = default; + FileInfo(FileInfo&&) = default; + FileInfo& operator=(FileInfo&&) = default; + FileInfo(const FileInfo&) = default; + FileInfo& operator=(const FileInfo&) = default; + + explicit FileInfo(std::string path, FileType type = FileType::Unknown) + : path_(std::move(path)), type_(type) {} + + /// The file type + FileType type() const { return type_; } + void set_type(FileType type) { type_ = type; } + + /// The full file path in the filesystem + const std::string& path() const { return path_; } + void set_path(std::string path) { path_ = std::move(path); } + + /// The file base name (component after the last directory separator) + std::string base_name() const; + + // The directory base name (component before the file base name). + std::string dir_name() const; + + /// The size in bytes, if available + /// + /// Only regular files are guaranteed to have a size. + int64_t size() const { return size_; } + void set_size(int64_t size) { size_ = size; } + + /// The file extension (excluding the dot) + std::string extension() const; + + /// The time of last modification, if available + TimePoint mtime() const { return mtime_; } + void set_mtime(TimePoint mtime) { mtime_ = mtime; } + + bool IsFile() const { return type_ == FileType::File; } + bool IsDirectory() const { return type_ == FileType::Directory; } + + bool Equals(const FileInfo& other) const { + return type() == other.type() && path() == other.path() && size() == other.size() && + mtime() == other.mtime(); + } + + std::string ToString() const; + + /// Function object implementing less-than comparison and hashing by + /// path, to support sorting infos, using them as keys, and other + /// interactions with the STL. + struct ByPath { + bool operator()(const FileInfo& l, const FileInfo& r) const { + return l.path() < r.path(); + } + + size_t operator()(const FileInfo& i) const { + return std::hash<std::string>{}(i.path()); + } + }; + + protected: + std::string path_; + FileType type_ = FileType::Unknown; + int64_t size_ = kNoSize; + TimePoint mtime_ = kNoTime; +}; + +ARROW_EXPORT std::ostream& operator<<(std::ostream& os, const FileInfo&); + +/// \brief File selector for filesystem APIs +struct ARROW_EXPORT FileSelector { + /// The directory in which to select files. + /// If the path exists but doesn't point to a directory, this should be an error. + std::string base_dir; + /// The behavior if `base_dir` isn't found in the filesystem. If false, + /// an error is returned. If true, an empty selection is returned. + bool allow_not_found; + /// Whether to recurse into subdirectories. + bool recursive; + /// The maximum number of subdirectories to recurse into. + int32_t max_recursion; + + FileSelector() : allow_not_found(false), recursive(false), max_recursion(INT32_MAX) {} +}; + +/// \brief FileSystem, path pair +struct ARROW_EXPORT FileLocator { + std::shared_ptr<FileSystem> filesystem; + std::string path; +}; + +using FileInfoVector = std::vector<FileInfo>; +using FileInfoGenerator = std::function<Future<FileInfoVector>()>; + +} // namespace fs + +template <> +struct IterationTraits<fs::FileInfoVector> { + static fs::FileInfoVector End() { return {}; } + static bool IsEnd(const fs::FileInfoVector& val) { return val.empty(); } +}; + +namespace fs { + +/// \brief Abstract file system API +class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> { + public: + virtual ~FileSystem(); + + virtual std::string type_name() const = 0; + + /// EXPERIMENTAL: The IOContext associated with this filesystem. + const io::IOContext& io_context() const { return io_context_; } + + /// Normalize path for the given filesystem + /// + /// The default implementation of this method is a no-op, but subclasses + /// may allow normalizing irregular path forms (such as Windows local paths). + virtual Result<std::string> NormalizePath(std::string path); + + virtual bool Equals(const FileSystem& other) const = 0; + + virtual bool Equals(const std::shared_ptr<FileSystem>& other) const { + return Equals(*other); + } + + /// Get info for the given target. + /// + /// Any symlink is automatically dereferenced, recursively. + /// A nonexistent or unreachable file returns an Ok status and + /// has a FileType of value NotFound. An error status indicates + /// a truly exceptional condition (low-level I/O error, etc.). + virtual Result<FileInfo> GetFileInfo(const std::string& path) = 0; + /// Same, for many targets at once. + virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths); + /// Same, according to a selector. + /// + /// The selector's base directory will not be part of the results, even if + /// it exists. + /// If it doesn't exist, see `FileSelector::allow_not_found`. + virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select) = 0; + + /// EXPERIMENTAL: async version of GetFileInfo + virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths); + + /// EXPERIMENTAL: streaming async version of GetFileInfo + /// + /// The returned generator is not async-reentrant, i.e. you need to wait for + /// the returned future to complete before calling the generator again. + virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select); + + /// Create a directory and subdirectories. + /// + /// This function succeeds if the directory already exists. + virtual Status CreateDir(const std::string& path, bool recursive = true) = 0; + + /// Delete a directory and its contents, recursively. + virtual Status DeleteDir(const std::string& path) = 0; + + /// Delete a directory's contents, recursively. + /// + /// Like DeleteDir, but doesn't delete the directory itself. + /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents. + virtual Status DeleteDirContents(const std::string& path) = 0; + + /// EXPERIMENTAL: Delete the root directory's contents, recursively. + /// + /// Implementations may decide to raise an error if this operation is + /// too dangerous. + // NOTE: may decide to remove this if it's deemed not useful + virtual Status DeleteRootDirContents() = 0; + + /// Delete a file. + virtual Status DeleteFile(const std::string& path) = 0; + /// Delete many files. + /// + /// The default implementation issues individual delete operations in sequence. + virtual Status DeleteFiles(const std::vector<std::string>& paths); + + /// Move / rename a file or directory. + /// + /// If the destination exists: + /// - if it is a non-empty directory, an error is returned + /// - otherwise, if it has the same type as the source, it is replaced + /// - otherwise, behavior is unspecified (implementation-dependent). + virtual Status Move(const std::string& src, const std::string& dest) = 0; + + /// Copy a file. + /// + /// If the destination exists and is a directory, an error is returned. + /// Otherwise, it is replaced. + virtual Status CopyFile(const std::string& src, const std::string& dest) = 0; + + /// Open an input stream for sequential reading. + virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream( + const std::string& path) = 0; + /// Open an input stream for sequential reading. + /// + /// This override assumes the given FileInfo validly represents the file's + /// characteristics, and may optimize access depending on them (for example + /// avoid querying the file size or its existence). + virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info); + + /// Open an input file for random access reading. + virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const std::string& path) = 0; + /// Open an input file for random access reading. + /// + /// This override assumes the given FileInfo validly represents the file's + /// characteristics, and may optimize access depending on them (for example + /// avoid querying the file size or its existence). + virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const FileInfo& info); + + /// EXPERIMENTAL: async version of OpenInputStream + virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( + const std::string& path); + /// EXPERIMENTAL: async version of OpenInputStream + virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( + const FileInfo& info); + + /// EXPERIMENTAL: async version of OpenInputFile + virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( + const std::string& path); + /// EXPERIMENTAL: async version of OpenInputFile + virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( + const FileInfo& info); + + /// Open an output stream for sequential writing. + /// + /// If the target already exists, existing data is truncated. + virtual Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata) = 0; + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path); + + /// Open an output stream for appending. + /// + /// If the target doesn't exist, a new empty file is created. + virtual Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata) = 0; + Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path); + + protected: + explicit FileSystem(const io::IOContext& io_context = io::default_io_context()) + : io_context_(io_context) {} + + io::IOContext io_context_; + // Whether metadata operations (such as GetFileInfo or OpenInputStream) + // are cheap enough that the default async variants don't bother with + // a thread pool. + bool default_async_is_sync_ = true; +}; + +/// \brief A FileSystem implementation that delegates to another +/// implementation after prepending a fixed base path. +/// +/// This is useful to expose a logical view of a subtree of a filesystem, +/// for example a directory in a LocalFileSystem. +/// This works on abstract paths, i.e. paths using forward slashes and +/// and a single root "/". Windows paths are not guaranteed to work. +/// This makes no security guarantee. For example, symlinks may allow to +/// "escape" the subtree and access other parts of the underlying filesystem. +class ARROW_EXPORT SubTreeFileSystem : public FileSystem { + public: + // This constructor may abort if base_path is invalid. + explicit SubTreeFileSystem(const std::string& base_path, + std::shared_ptr<FileSystem> base_fs); + ~SubTreeFileSystem() override; + + std::string type_name() const override { return "subtree"; } + std::string base_path() const { return base_path_; } + std::shared_ptr<FileSystem> base_fs() const { return base_fs_; } + + Result<std::string> NormalizePath(std::string path) override; + + bool Equals(const FileSystem& other) const override; + + /// \cond FALSE + using FileSystem::GetFileInfo; + /// \endcond + Result<FileInfo> GetFileInfo(const std::string& path) override; + Result<FileInfoVector> GetFileInfo(const FileSelector& select) override; + + FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path) override; + Status DeleteRootDirContents() override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Result<std::shared_ptr<io::InputStream>> OpenInputStream( + const std::string& path) override; + Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const std::string& path) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const FileInfo& info) override; + + Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( + const std::string& path) override; + Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( + const FileInfo& info) override; + Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( + const std::string& path) override; + Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( + const FileInfo& info) override; + + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + + protected: + SubTreeFileSystem() {} + + const std::string base_path_; + std::shared_ptr<FileSystem> base_fs_; + + std::string PrependBase(const std::string& s) const; + Status PrependBaseNonEmpty(std::string* s) const; + Result<std::string> StripBase(const std::string& s) const; + Status FixInfo(FileInfo* info) const; + + static Result<std::string> NormalizeBasePath( + std::string base_path, const std::shared_ptr<FileSystem>& base_fs); +}; + +/// \brief A FileSystem implementation that delegates to another +/// implementation but inserts latencies at various points. +class ARROW_EXPORT SlowFileSystem : public FileSystem { + public: + SlowFileSystem(std::shared_ptr<FileSystem> base_fs, + std::shared_ptr<io::LatencyGenerator> latencies); + SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency); + SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency, + int32_t seed); + + std::string type_name() const override { return "slow"; } + bool Equals(const FileSystem& other) const override; + + using FileSystem::GetFileInfo; + Result<FileInfo> GetFileInfo(const std::string& path) override; + Result<FileInfoVector> GetFileInfo(const FileSelector& select) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path) override; + Status DeleteRootDirContents() override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Result<std::shared_ptr<io::InputStream>> OpenInputStream( + const std::string& path) override; + Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const std::string& path) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const FileInfo& info) override; + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + + protected: + std::shared_ptr<FileSystem> base_fs_; + std::shared_ptr<io::LatencyGenerator> latencies_; +}; + +/// \defgroup filesystem-factories Functions for creating FileSystem instances +/// +/// @{ + +/// \brief Create a new FileSystem by URI +/// +/// Recognized schemes are "file", "mock", "hdfs" and "s3fs". +/// +/// \param[in] uri a URI-based path, ex: file:///some/local/path +/// \param[out] out_path (optional) Path inside the filesystem. +/// \return out_fs FileSystem instance. +ARROW_EXPORT +Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri, + std::string* out_path = NULLPTR); + +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Recognized schemes are "file", "mock", "hdfs" and "s3fs". +/// +/// \param[in] uri a URI-based path, ex: file:///some/local/path +/// \param[in] io_context an IOContext which will be associated with the filesystem +/// \param[out] out_path (optional) Path inside the filesystem. +/// \return out_fs FileSystem instance. +ARROW_EXPORT +Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri, + const io::IOContext& io_context, + std::string* out_path = NULLPTR); + +/// \brief Create a new FileSystem by URI +/// +/// Same as FileSystemFromUri, but in addition also recognize non-URIs +/// and treat them as local filesystem paths. Only absolute local filesystem +/// paths are allowed. +ARROW_EXPORT +Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath( + const std::string& uri, std::string* out_path = NULLPTR); + +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Same as FileSystemFromUri, but in addition also recognize non-URIs +/// and treat them as local filesystem paths. Only absolute local filesystem +/// paths are allowed. +ARROW_EXPORT +Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath( + const std::string& uri, const io::IOContext& io_context, + std::string* out_path = NULLPTR); + +/// @} + +/// \brief Copy files, including from one FileSystem to another +/// +/// If a source and destination are resident in the same FileSystem FileSystem::CopyFile +/// will be used, otherwise the file will be opened as a stream in both FileSystems and +/// chunks copied from the source to the destination. No directories will be created. +ARROW_EXPORT +Status CopyFiles(const std::vector<FileLocator>& sources, + const std::vector<FileLocator>& destinations, + const io::IOContext& io_context = io::default_io_context(), + int64_t chunk_size = 1024 * 1024, bool use_threads = true); + +/// \brief Copy selected files, including from one FileSystem to another +/// +/// Directories will be created under the destination base directory as needed. +ARROW_EXPORT +Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs, + const FileSelector& source_sel, + const std::shared_ptr<FileSystem>& destination_fs, + const std::string& destination_base_dir, + const io::IOContext& io_context = io::default_io_context(), + int64_t chunk_size = 1024 * 1024, bool use_threads = true); + +struct FileSystemGlobalOptions { + /// Path to a single PEM file holding all TLS CA certificates + /// + /// If empty, the underlying TLS library's defaults will be used. + std::string tls_ca_file_path; + + /// Path to a directory holding TLS CA certificates in individual PEM files + /// named along the OpenSSL "hashed" format. + /// + /// If empty, the underlying TLS library's defaults will be used. + std::string tls_ca_dir_path; +}; + +/// Experimental: optional global initialization routine +/// +/// This is for environments (such as manylinux) where the path +/// to TLS CA certificates needs to be configured at runtime. +ARROW_EXPORT +Status Initialize(const FileSystemGlobalOptions& options); + +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc new file mode 100644 index 00000000000..775fd746aa6 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.cc @@ -0,0 +1,448 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <chrono> +#include <cstring> +#include <sstream> +#include <utility> + +#ifdef _WIN32 +#include "arrow/util/windows_compatibility.h" +#else +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/stat.h> +#endif + +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/file.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/uri.h" +#include "arrow/util/windows_fixup.h" + +namespace arrow { +namespace fs { + +using ::arrow::internal::IOErrorFromErrno; +#ifdef _WIN32 +using ::arrow::internal::IOErrorFromWinError; +#endif +using ::arrow::internal::NativePathString; +using ::arrow::internal::PlatformFilename; + +namespace internal { + +#ifdef _WIN32 +static bool IsDriveLetter(char c) { + // Can't use locale-dependent functions from the C/C++ stdlib + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z'); +} +#endif + +bool DetectAbsolutePath(const std::string& s) { + // Is it a /-prefixed local path? + if (s.length() >= 1 && s[0] == '/') { + return true; + } +#ifdef _WIN32 + // Is it a \-prefixed local path? + if (s.length() >= 1 && s[0] == '\\') { + return true; + } + // Does it start with a drive letter in addition to being /- or \-prefixed, + // e.g. "C:\..."? + if (s.length() >= 3 && s[1] == ':' && (s[2] == '/' || s[2] == '\\') && + IsDriveLetter(s[0])) { + return true; + } +#endif + return false; +} + +} // namespace internal + +namespace { + +#ifdef _WIN32 + +std::string NativeToString(const NativePathString& ns) { + PlatformFilename fn(ns); + return fn.ToString(); +} + +TimePoint ToTimePoint(FILETIME ft) { + // Hundreds of nanoseconds between January 1, 1601 (UTC) and the Unix epoch. + static constexpr int64_t kFileTimeEpoch = 11644473600LL * 10000000; + + int64_t hundreds = (static_cast<int64_t>(ft.dwHighDateTime) << 32) + ft.dwLowDateTime - + kFileTimeEpoch; // hundreds of ns since Unix epoch + std::chrono::nanoseconds ns_count(100 * hundreds); + return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count)); +} + +FileInfo FileInformationToFileInfo(const BY_HANDLE_FILE_INFORMATION& information) { + FileInfo info; + if (information.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + info.set_type(FileType::Directory); + info.set_size(kNoSize); + } else { + // Regular file + info.set_type(FileType::File); + info.set_size((static_cast<int64_t>(information.nFileSizeHigh) << 32) + + information.nFileSizeLow); + } + info.set_mtime(ToTimePoint(information.ftLastWriteTime)); + return info; +} + +Result<FileInfo> StatFile(const std::wstring& path) { + HANDLE h; + std::string bytes_path = NativeToString(path); + FileInfo info; + + /* Inspired by CPython, see Modules/posixmodule.c */ + h = CreateFileW(path.c_str(), FILE_READ_ATTRIBUTES, /* desired access */ + 0, /* share mode */ + NULL, /* security attributes */ + OPEN_EXISTING, + /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */ + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS, NULL); + + if (h == INVALID_HANDLE_VALUE) { + DWORD err = GetLastError(); + if (err == ERROR_FILE_NOT_FOUND || err == ERROR_PATH_NOT_FOUND) { + info.set_path(bytes_path); + info.set_type(FileType::NotFound); + info.set_mtime(kNoTime); + info.set_size(kNoSize); + return info; + } else { + return IOErrorFromWinError(GetLastError(), "Failed querying information for path '", + bytes_path, "'"); + } + } + BY_HANDLE_FILE_INFORMATION information; + if (!GetFileInformationByHandle(h, &information)) { + CloseHandle(h); + return IOErrorFromWinError(GetLastError(), "Failed querying information for path '", + bytes_path, "'"); + } + CloseHandle(h); + info = FileInformationToFileInfo(information); + info.set_path(bytes_path); + return info; +} + +#else // POSIX systems + +TimePoint ToTimePoint(const struct timespec& s) { + std::chrono::nanoseconds ns_count(static_cast<int64_t>(s.tv_sec) * 1000000000 + + static_cast<int64_t>(s.tv_nsec)); + return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count)); +} + +FileInfo StatToFileInfo(const struct stat& s) { + FileInfo info; + if (S_ISREG(s.st_mode)) { + info.set_type(FileType::File); + info.set_size(static_cast<int64_t>(s.st_size)); + } else if (S_ISDIR(s.st_mode)) { + info.set_type(FileType::Directory); + info.set_size(kNoSize); + } else { + info.set_type(FileType::Unknown); + info.set_size(kNoSize); + } +#ifdef __APPLE__ + // macOS doesn't use the POSIX-compliant spelling + info.set_mtime(ToTimePoint(s.st_mtimespec)); +#else + info.set_mtime(ToTimePoint(s.st_mtim)); +#endif + return info; +} + +Result<FileInfo> StatFile(const std::string& path) { + FileInfo info; + struct stat s; + int r = stat(path.c_str(), &s); + if (r == -1) { + if (errno == ENOENT || errno == ENOTDIR || errno == ELOOP) { + info.set_type(FileType::NotFound); + info.set_mtime(kNoTime); + info.set_size(kNoSize); + } else { + return IOErrorFromErrno(errno, "Failed stat()ing path '", path, "'"); + } + } else { + info = StatToFileInfo(s); + } + info.set_path(path); + return info; +} + +#endif + +Status StatSelector(const PlatformFilename& dir_fn, const FileSelector& select, + int32_t nesting_depth, std::vector<FileInfo>* out) { + auto result = ListDir(dir_fn); + if (!result.ok()) { + auto status = result.status(); + if (select.allow_not_found && status.IsIOError()) { + ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn)); + if (!exists) { + return Status::OK(); + } + } + return status; + } + + for (const auto& child_fn : *result) { + PlatformFilename full_fn = dir_fn.Join(child_fn); + ARROW_ASSIGN_OR_RAISE(FileInfo info, StatFile(full_fn.ToNative())); + if (info.type() != FileType::NotFound) { + out->push_back(std::move(info)); + } + if (nesting_depth < select.max_recursion && select.recursive && + info.type() == FileType::Directory) { + RETURN_NOT_OK(StatSelector(full_fn, select, nesting_depth + 1, out)); + } + } + return Status::OK(); +} + +} // namespace + +LocalFileSystemOptions LocalFileSystemOptions::Defaults() { + return LocalFileSystemOptions(); +} + +bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { + return use_mmap == other.use_mmap; +} + +Result<LocalFileSystemOptions> LocalFileSystemOptions::FromUri( + const ::arrow::internal::Uri& uri, std::string* out_path) { + if (!uri.username().empty() || !uri.password().empty()) { + return Status::Invalid("Unsupported username or password in local URI: '", + uri.ToString(), "'"); + } + std::string path; + const auto host = uri.host(); + if (!host.empty()) { +#ifdef _WIN32 + std::stringstream ss; + ss << "//" << host << "/" << internal::RemoveLeadingSlash(uri.path()); + *out_path = ss.str(); +#else + return Status::Invalid("Unsupported hostname in non-Windows local URI: '", + uri.ToString(), "'"); +#endif + } else { + *out_path = uri.path(); + } + + // TODO handle use_mmap option + return LocalFileSystemOptions(); +} + +LocalFileSystem::LocalFileSystem(const io::IOContext& io_context) + : FileSystem(io_context), options_(LocalFileSystemOptions::Defaults()) {} + +LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options, + const io::IOContext& io_context) + : FileSystem(io_context), options_(options) {} + +LocalFileSystem::~LocalFileSystem() {} + +Result<std::string> LocalFileSystem::NormalizePath(std::string path) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + return fn.ToString(); +} + +bool LocalFileSystem::Equals(const FileSystem& other) const { + if (other.type_name() != type_name()) { + return false; + } else { + const auto& localfs = ::arrow::internal::checked_cast<const LocalFileSystem&>(other); + return options_.Equals(localfs.options()); + } +} + +Result<FileInfo> LocalFileSystem::GetFileInfo(const std::string& path) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + return StatFile(fn.ToNative()); +} + +Result<std::vector<FileInfo>> LocalFileSystem::GetFileInfo(const FileSelector& select) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(select.base_dir)); + std::vector<FileInfo> results; + RETURN_NOT_OK(StatSelector(fn, select, 0, &results)); + return results; +} + +Status LocalFileSystem::CreateDir(const std::string& path, bool recursive) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + if (recursive) { + return ::arrow::internal::CreateDirTree(fn).status(); + } else { + return ::arrow::internal::CreateDir(fn).status(); + } +} + +Status LocalFileSystem::DeleteDir(const std::string& path) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + auto st = ::arrow::internal::DeleteDirTree(fn, /*allow_not_found=*/false).status(); + if (!st.ok()) { + // TODO Status::WithPrefix()? + std::stringstream ss; + ss << "Cannot delete directory '" << path << "': " << st.message(); + return st.WithMessage(ss.str()); + } + return Status::OK(); +} + +Status LocalFileSystem::DeleteDirContents(const std::string& path) { + if (internal::IsEmptyPath(path)) { + return internal::InvalidDeleteDirContents(path); + } + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + auto st = ::arrow::internal::DeleteDirContents(fn, /*allow_not_found=*/false).status(); + if (!st.ok()) { + std::stringstream ss; + ss << "Cannot delete directory contents in '" << path << "': " << st.message(); + return st.WithMessage(ss.str()); + } + return Status::OK(); +} + +Status LocalFileSystem::DeleteRootDirContents() { + return Status::Invalid("LocalFileSystem::DeleteRootDirContents is strictly forbidden"); +} + +Status LocalFileSystem::DeleteFile(const std::string& path) { + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + return ::arrow::internal::DeleteFile(fn, /*allow_not_found=*/false).status(); +} + +Status LocalFileSystem::Move(const std::string& src, const std::string& dest) { + ARROW_ASSIGN_OR_RAISE(auto sfn, PlatformFilename::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dfn, PlatformFilename::FromString(dest)); + +#ifdef _WIN32 + if (!MoveFileExW(sfn.ToNative().c_str(), dfn.ToNative().c_str(), + MOVEFILE_REPLACE_EXISTING)) { + return IOErrorFromWinError(GetLastError(), "Failed renaming '", sfn.ToString(), + "' to '", dfn.ToString(), "'"); + } +#else + if (rename(sfn.ToNative().c_str(), dfn.ToNative().c_str()) == -1) { + return IOErrorFromErrno(errno, "Failed renaming '", sfn.ToString(), "' to '", + dfn.ToString(), "'"); + } +#endif + return Status::OK(); +} + +Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest) { + ARROW_ASSIGN_OR_RAISE(auto sfn, PlatformFilename::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dfn, PlatformFilename::FromString(dest)); + // XXX should we use fstat() to compare inodes? + if (sfn.ToNative() == dfn.ToNative()) { + return Status::OK(); + } + +#ifdef _WIN32 + if (!CopyFileW(sfn.ToNative().c_str(), dfn.ToNative().c_str(), + FALSE /* bFailIfExists */)) { + return IOErrorFromWinError(GetLastError(), "Failed copying '", sfn.ToString(), + "' to '", dfn.ToString(), "'"); + } + return Status::OK(); +#else + ARROW_ASSIGN_OR_RAISE(auto is, OpenInputStream(src)); + ARROW_ASSIGN_OR_RAISE(auto os, OpenOutputStream(dest)); + RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */, io_context())); + RETURN_NOT_OK(os->Close()); + return is->Close(); +#endif +} + +namespace { + +template <typename InputStreamType> +Result<std::shared_ptr<InputStreamType>> OpenInputStreamGeneric( + const std::string& path, const LocalFileSystemOptions& options, + const io::IOContext& io_context) { + if (options.use_mmap) { + return io::MemoryMappedFile::Open(path, io::FileMode::READ); + } else { + return io::ReadableFile::Open(path, io_context.pool()); + } +} + +} // namespace + +Result<std::shared_ptr<io::InputStream>> LocalFileSystem::OpenInputStream( + const std::string& path) { + return OpenInputStreamGeneric<io::InputStream>(path, options_, io_context()); +} + +Result<std::shared_ptr<io::RandomAccessFile>> LocalFileSystem::OpenInputFile( + const std::string& path) { + return OpenInputStreamGeneric<io::RandomAccessFile>(path, options_, io_context()); +} + +namespace { + +Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(const std::string& path, + bool truncate, + bool append) { + int fd; + bool write_only = true; + ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); + ARROW_ASSIGN_OR_RAISE( + fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append)); + auto maybe_stream = io::FileOutputStream::Open(fd); + if (!maybe_stream.ok()) { + ARROW_UNUSED(::arrow::internal::FileClose(fd)); + } + return maybe_stream; +} + +} // namespace + +Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + bool truncate = true; + bool append = false; + return OpenOutputStreamGeneric(path, truncate, append); +} + +Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + bool truncate = false; + bool append = true; + return OpenOutputStreamGeneric(path, truncate, append); +} + +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h new file mode 100644 index 00000000000..f8e77aee591 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/localfs.h @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/filesystem/filesystem.h" + +namespace arrow { +namespace internal { + +class Uri; + +} + +namespace fs { + +/// Options for the LocalFileSystem implementation. +struct ARROW_EXPORT LocalFileSystemOptions { + /// Whether OpenInputStream and OpenInputFile return a mmap'ed file, + /// or a regular one. + bool use_mmap = false; + + /// \brief Initialize with defaults + static LocalFileSystemOptions Defaults(); + + bool Equals(const LocalFileSystemOptions& other) const; + + static Result<LocalFileSystemOptions> FromUri(const ::arrow::internal::Uri& uri, + std::string* out_path); +}; + +/// \brief A FileSystem implementation accessing files on the local machine. +/// +/// This class handles only `/`-separated paths. If desired, conversion +/// from Windows backslash-separated paths should be done by the caller. +/// Details such as symlinks are abstracted away (symlinks are always +/// followed, except when deleting an entry). +class ARROW_EXPORT LocalFileSystem : public FileSystem { + public: + explicit LocalFileSystem(const io::IOContext& = io::default_io_context()); + explicit LocalFileSystem(const LocalFileSystemOptions&, + const io::IOContext& = io::default_io_context()); + ~LocalFileSystem() override; + + std::string type_name() const override { return "local"; } + + Result<std::string> NormalizePath(std::string path) override; + + bool Equals(const FileSystem& other) const override; + + LocalFileSystemOptions options() const { return options_; } + + /// \cond FALSE + using FileSystem::GetFileInfo; + /// \endcond + Result<FileInfo> GetFileInfo(const std::string& path) override; + Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path) override; + Status DeleteRootDirContents() override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Result<std::shared_ptr<io::InputStream>> OpenInputStream( + const std::string& path) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const std::string& path) override; + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + + protected: + LocalFileSystemOptions options_; +}; + +namespace internal { + +// Return whether the string is detected as a local absolute path. +ARROW_EXPORT +bool DetectAbsolutePath(const std::string& s); + +} // namespace internal + +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc new file mode 100644 index 00000000000..14a38283b26 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.cc @@ -0,0 +1,780 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include <iterator> +#include <map> +#include <mutex> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/buffer_builder.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" +#include "arrow/util/variant.h" +#include "arrow/util/windows_fixup.h" + +namespace arrow { +namespace fs { +namespace internal { + +namespace { + +//////////////////////////////////////////////////////////////////////////// +// Filesystem structure + +class Entry; + +struct File { + TimePoint mtime; + std::string name; + std::shared_ptr<Buffer> data; + std::shared_ptr<const KeyValueMetadata> metadata; + + File(TimePoint mtime, std::string name) : mtime(mtime), name(std::move(name)) {} + + int64_t size() const { return data ? data->size() : 0; } + + explicit operator util::string_view() const { + if (data) { + return util::string_view(*data); + } else { + return ""; + } + } +}; + +struct Directory { + std::string name; + TimePoint mtime; + std::map<std::string, std::unique_ptr<Entry>> entries; + + Directory(std::string name, TimePoint mtime) : name(std::move(name)), mtime(mtime) {} + Directory(Directory&& other) noexcept + : name(std::move(other.name)), + mtime(other.mtime), + entries(std::move(other.entries)) {} + + Directory& operator=(Directory&& other) noexcept { + name = std::move(other.name); + mtime = other.mtime; + entries = std::move(other.entries); + return *this; + } + + Entry* Find(const std::string& s) { + auto it = entries.find(s); + if (it != entries.end()) { + return it->second.get(); + } else { + return nullptr; + } + } + + bool CreateEntry(const std::string& s, std::unique_ptr<Entry> entry) { + DCHECK(!s.empty()); + auto p = entries.emplace(s, std::move(entry)); + return p.second; + } + + void AssignEntry(const std::string& s, std::unique_ptr<Entry> entry) { + DCHECK(!s.empty()); + entries[s] = std::move(entry); + } + + bool DeleteEntry(const std::string& s) { return entries.erase(s) > 0; } + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Directory); +}; + +// A filesystem entry +using EntryBase = util::Variant<std::nullptr_t, File, Directory>; + +class Entry : public EntryBase { + public: + Entry(Entry&&) = default; + Entry& operator=(Entry&&) = default; + explicit Entry(Directory&& v) : EntryBase(std::move(v)) {} + explicit Entry(File&& v) : EntryBase(std::move(v)) {} + + bool is_dir() const { return util::holds_alternative<Directory>(*this); } + + bool is_file() const { return util::holds_alternative<File>(*this); } + + Directory& as_dir() { return util::get<Directory>(*this); } + + File& as_file() { return util::get<File>(*this); } + + // Get info for this entry. Note the path() property isn't set. + FileInfo GetInfo() { + FileInfo info; + if (is_dir()) { + Directory& dir = as_dir(); + info.set_type(FileType::Directory); + info.set_mtime(dir.mtime); + } else { + DCHECK(is_file()); + File& file = as_file(); + info.set_type(FileType::File); + info.set_mtime(file.mtime); + info.set_size(file.size()); + } + return info; + } + + // Get info for this entry, knowing the parent path. + FileInfo GetInfo(const std::string& base_path) { + FileInfo info; + if (is_dir()) { + Directory& dir = as_dir(); + info.set_type(FileType::Directory); + info.set_mtime(dir.mtime); + info.set_path(ConcatAbstractPath(base_path, dir.name)); + } else { + DCHECK(is_file()); + File& file = as_file(); + info.set_type(FileType::File); + info.set_mtime(file.mtime); + info.set_size(file.size()); + info.set_path(ConcatAbstractPath(base_path, file.name)); + } + return info; + } + + // Set the entry name + void SetName(const std::string& name) { + if (is_dir()) { + as_dir().name = name; + } else { + DCHECK(is_file()); + as_file().name = name; + } + } + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Entry); +}; + +//////////////////////////////////////////////////////////////////////////// +// Streams + +class MockFSOutputStream : public io::OutputStream { + public: + MockFSOutputStream(File* file, MemoryPool* pool) + : file_(file), builder_(pool), closed_(false) {} + + ~MockFSOutputStream() override = default; + + // Implement the OutputStream interface + Status Close() override { + if (!closed_) { + RETURN_NOT_OK(builder_.Finish(&file_->data)); + closed_ = true; + } + return Status::OK(); + } + + Status Abort() override { + if (!closed_) { + // MockFSOutputStream is mainly used for debugging and testing, so + // mark an aborted file's contents explicitly. + std::stringstream ss; + ss << "MockFSOutputStream aborted after " << file_->size() << " bytes written"; + file_->data = Buffer::FromString(ss.str()); + closed_ = true; + } + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result<int64_t> Tell() const override { + if (closed_) { + return Status::Invalid("Invalid operation on closed stream"); + } + return builder_.length(); + } + + Status Write(const void* data, int64_t nbytes) override { + if (closed_) { + return Status::Invalid("Invalid operation on closed stream"); + } + return builder_.Append(data, nbytes); + } + + protected: + File* file_; + BufferBuilder builder_; + bool closed_; +}; + +class MockFSInputStream : public io::BufferReader { + public: + explicit MockFSInputStream(const File& file) + : io::BufferReader(file.data), metadata_(file.metadata) {} + + Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override { + return metadata_; + } + + protected: + std::shared_ptr<const KeyValueMetadata> metadata_; +}; + +} // namespace + +std::ostream& operator<<(std::ostream& os, const MockDirInfo& di) { + return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count() + << "]"; +} + +std::ostream& operator<<(std::ostream& os, const MockFileInfo& di) { + return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count() + << ", size=" << di.data.length() << "]"; +} + +//////////////////////////////////////////////////////////////////////////// +// MockFileSystem implementation + +class MockFileSystem::Impl { + public: + TimePoint current_time; + MemoryPool* pool; + + // The root directory + Entry root; + std::mutex mutex; + + Impl(TimePoint current_time, MemoryPool* pool) + : current_time(current_time), pool(pool), root(Directory("", current_time)) {} + + std::unique_lock<std::mutex> lock_guard() { + return std::unique_lock<std::mutex>(mutex); + } + + Directory& RootDir() { return root.as_dir(); } + + template <typename It> + Entry* FindEntry(It it, It end, size_t* nconsumed) { + size_t consumed = 0; + Entry* entry = &root; + + for (; it != end; ++it) { + const std::string& part = *it; + DCHECK(entry->is_dir()); + Entry* child = entry->as_dir().Find(part); + if (child == nullptr) { + // Partial find only + break; + } + ++consumed; + entry = child; + if (entry->is_file()) { + // Cannot go any further + break; + } + // Recurse + } + *nconsumed = consumed; + return entry; + } + + // Find an entry, allowing partial matching + Entry* FindEntry(const std::vector<std::string>& parts, size_t* nconsumed) { + return FindEntry(parts.begin(), parts.end(), nconsumed); + } + + // Find an entry, only full matching allowed + Entry* FindEntry(const std::vector<std::string>& parts) { + size_t consumed; + auto entry = FindEntry(parts, &consumed); + return (consumed == parts.size()) ? entry : nullptr; + } + + // Find the parent entry, only full matching allowed + Entry* FindParent(const std::vector<std::string>& parts) { + if (parts.size() == 0) { + return nullptr; + } + size_t consumed; + auto last = parts.end(); + last--; + auto entry = FindEntry(parts.begin(), last, &consumed); + return (consumed == parts.size() - 1) ? entry : nullptr; + } + + void GatherInfos(const FileSelector& select, const std::string& base_path, + const Directory& base_dir, int32_t nesting_depth, + std::vector<FileInfo>* infos) { + for (const auto& pair : base_dir.entries) { + Entry* child = pair.second.get(); + infos->push_back(child->GetInfo(base_path)); + if (select.recursive && nesting_depth < select.max_recursion && child->is_dir()) { + Directory& child_dir = child->as_dir(); + std::string child_path = infos->back().path(); + GatherInfos(select, std::move(child_path), child_dir, nesting_depth + 1, infos); + } + } + } + + void DumpDirs(const std::string& prefix, const Directory& dir, + std::vector<MockDirInfo>* out) { + std::string path = prefix + dir.name; + if (!path.empty()) { + out->push_back({path, dir.mtime}); + path += "/"; + } + for (const auto& pair : dir.entries) { + Entry* child = pair.second.get(); + if (child->is_dir()) { + DumpDirs(path, child->as_dir(), out); + } + } + } + + void DumpFiles(const std::string& prefix, const Directory& dir, + std::vector<MockFileInfo>* out) { + std::string path = prefix + dir.name; + if (!path.empty()) { + path += "/"; + } + for (const auto& pair : dir.entries) { + Entry* child = pair.second.get(); + if (child->is_file()) { + auto& file = child->as_file(); + out->push_back({path + file.name, file.mtime, util::string_view(file)}); + } else if (child->is_dir()) { + DumpFiles(path, child->as_dir(), out); + } + } + } + + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, bool append, + const std::shared_ptr<const KeyValueMetadata>& metadata) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + Entry* parent = FindParent(parts); + if (parent == nullptr || !parent->is_dir()) { + return PathNotFound(path); + } + // Find the file in the parent dir, or create it + const auto& name = parts.back(); + Entry* child = parent->as_dir().Find(name); + File* file; + if (child == nullptr) { + child = new Entry(File(current_time, name)); + parent->as_dir().AssignEntry(name, std::unique_ptr<Entry>(child)); + file = &child->as_file(); + } else if (child->is_file()) { + file = &child->as_file(); + file->mtime = current_time; + } else { + return NotAFile(path); + } + file->metadata = metadata; + auto ptr = std::make_shared<MockFSOutputStream>(file, pool); + if (append && file->data) { + RETURN_NOT_OK(ptr->Write(file->data->data(), file->data->size())); + } + return ptr; + } + + Result<std::shared_ptr<io::BufferReader>> OpenInputReader(const std::string& path) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + Entry* entry = FindEntry(parts); + if (entry == nullptr) { + return PathNotFound(path); + } + if (!entry->is_file()) { + return NotAFile(path); + } + return std::make_shared<MockFSInputStream>(entry->as_file()); + } +}; + +MockFileSystem::~MockFileSystem() = default; + +MockFileSystem::MockFileSystem(TimePoint current_time, const io::IOContext& io_context) { + impl_ = std::unique_ptr<Impl>(new Impl(current_time, io_context.pool())); +} + +bool MockFileSystem::Equals(const FileSystem& other) const { return this == &other; } + +Status MockFileSystem::CreateDir(const std::string& path, bool recursive) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + size_t consumed; + Entry* entry = impl_->FindEntry(parts, &consumed); + if (!entry->is_dir()) { + auto file_path = JoinAbstractPath(parts.begin(), parts.begin() + consumed); + return Status::IOError("Cannot create directory '", path, "': ", "ancestor '", + file_path, "' is not a directory"); + } + if (!recursive && (parts.size() - consumed) > 1) { + return Status::IOError("Cannot create directory '", path, + "': ", "parent does not exist"); + } + for (size_t i = consumed; i < parts.size(); ++i) { + const auto& name = parts[i]; + std::unique_ptr<Entry> child(new Entry(Directory(name, impl_->current_time))); + Entry* child_ptr = child.get(); + bool inserted = entry->as_dir().CreateEntry(name, std::move(child)); + // No race condition on insertion is possible, as all operations are locked + DCHECK(inserted); + entry = child_ptr; + } + return Status::OK(); +} + +Status MockFileSystem::DeleteDir(const std::string& path) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + Entry* parent = impl_->FindParent(parts); + if (parent == nullptr || !parent->is_dir()) { + return PathNotFound(path); + } + Directory& parent_dir = parent->as_dir(); + auto child = parent_dir.Find(parts.back()); + if (child == nullptr) { + return PathNotFound(path); + } + if (!child->is_dir()) { + return NotADir(path); + } + + bool deleted = parent_dir.DeleteEntry(parts.back()); + DCHECK(deleted); + return Status::OK(); +} + +Status MockFileSystem::DeleteDirContents(const std::string& path) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + if (parts.empty()) { + // Wipe filesystem + return internal::InvalidDeleteDirContents(path); + } + + Entry* entry = impl_->FindEntry(parts); + if (entry == nullptr) { + return PathNotFound(path); + } + if (!entry->is_dir()) { + return NotADir(path); + } + entry->as_dir().entries.clear(); + return Status::OK(); +} + +Status MockFileSystem::DeleteRootDirContents() { + auto guard = impl_->lock_guard(); + + impl_->RootDir().entries.clear(); + return Status::OK(); +} + +Status MockFileSystem::DeleteFile(const std::string& path) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + Entry* parent = impl_->FindParent(parts); + if (parent == nullptr || !parent->is_dir()) { + return PathNotFound(path); + } + Directory& parent_dir = parent->as_dir(); + auto child = parent_dir.Find(parts.back()); + if (child == nullptr) { + return PathNotFound(path); + } + if (!child->is_file()) { + return NotAFile(path); + } + bool deleted = parent_dir.DeleteEntry(parts.back()); + DCHECK(deleted); + return Status::OK(); +} + +Result<FileInfo> MockFileSystem::GetFileInfo(const std::string& path) { + auto parts = SplitAbstractPath(path); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + FileInfo info; + Entry* entry = impl_->FindEntry(parts); + if (entry == nullptr) { + info.set_type(FileType::NotFound); + } else { + info = entry->GetInfo(); + } + info.set_path(path); + return info; +} + +Result<FileInfoVector> MockFileSystem::GetFileInfo(const FileSelector& selector) { + auto parts = SplitAbstractPath(selector.base_dir); + RETURN_NOT_OK(ValidateAbstractPathParts(parts)); + + auto guard = impl_->lock_guard(); + + FileInfoVector results; + + Entry* base_dir = impl_->FindEntry(parts); + if (base_dir == nullptr) { + // Base directory does not exist + if (selector.allow_not_found) { + return results; + } else { + return PathNotFound(selector.base_dir); + } + } + if (!base_dir->is_dir()) { + return NotADir(selector.base_dir); + } + + impl_->GatherInfos(selector, selector.base_dir, base_dir->as_dir(), 0, &results); + return results; +} + +namespace { + +// Helper for binary operations (move, copy) +struct BinaryOp { + std::vector<std::string> src_parts; + std::vector<std::string> dest_parts; + Directory& src_dir; + Directory& dest_dir; + std::string src_name; + std::string dest_name; + Entry* src_entry; + Entry* dest_entry; + + template <typename OpFunc> + static Status Run(MockFileSystem::Impl* impl, const std::string& src, + const std::string& dest, OpFunc&& op_func) { + auto src_parts = SplitAbstractPath(src); + auto dest_parts = SplitAbstractPath(dest); + RETURN_NOT_OK(ValidateAbstractPathParts(src_parts)); + RETURN_NOT_OK(ValidateAbstractPathParts(dest_parts)); + + auto guard = impl->lock_guard(); + + // Both source and destination must have valid parents + Entry* src_parent = impl->FindParent(src_parts); + if (src_parent == nullptr || !src_parent->is_dir()) { + return PathNotFound(src); + } + Entry* dest_parent = impl->FindParent(dest_parts); + if (dest_parent == nullptr || !dest_parent->is_dir()) { + return PathNotFound(dest); + } + Directory& src_dir = src_parent->as_dir(); + Directory& dest_dir = dest_parent->as_dir(); + DCHECK_GE(src_parts.size(), 1); + DCHECK_GE(dest_parts.size(), 1); + const auto& src_name = src_parts.back(); + const auto& dest_name = dest_parts.back(); + + BinaryOp op{std::move(src_parts), + std::move(dest_parts), + src_dir, + dest_dir, + src_name, + dest_name, + src_dir.Find(src_name), + dest_dir.Find(dest_name)}; + + return op_func(std::move(op)); + } +}; + +} // namespace + +Status MockFileSystem::Move(const std::string& src, const std::string& dest) { + return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status { + if (op.src_entry == nullptr) { + return PathNotFound(src); + } + if (op.dest_entry != nullptr) { + if (op.dest_entry->is_dir()) { + return Status::IOError("Cannot replace destination '", dest, + "', which is a directory"); + } + if (op.dest_entry->is_file() && op.src_entry->is_dir()) { + return Status::IOError("Cannot replace destination '", dest, + "', which is a file, with directory '", src, "'"); + } + } + if (op.src_parts.size() < op.dest_parts.size()) { + // Check if dest is a child of src + auto p = + std::mismatch(op.src_parts.begin(), op.src_parts.end(), op.dest_parts.begin()); + if (p.first == op.src_parts.end()) { + return Status::IOError("Cannot move '", src, "' into child path '", dest, "'"); + } + } + + // Move original entry, fix its name + std::unique_ptr<Entry> new_entry(new Entry(std::move(*op.src_entry))); + new_entry->SetName(op.dest_name); + bool deleted = op.src_dir.DeleteEntry(op.src_name); + DCHECK(deleted); + op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry)); + return Status::OK(); + }); +} + +Status MockFileSystem::CopyFile(const std::string& src, const std::string& dest) { + return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status { + if (op.src_entry == nullptr) { + return PathNotFound(src); + } + if (!op.src_entry->is_file()) { + return NotAFile(src); + } + if (op.dest_entry != nullptr && op.dest_entry->is_dir()) { + return Status::IOError("Cannot replace destination '", dest, + "', which is a directory"); + } + + // Copy original entry, fix its name + std::unique_ptr<Entry> new_entry(new Entry(File(op.src_entry->as_file()))); + new_entry->SetName(op.dest_name); + op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry)); + return Status::OK(); + }); +} + +Result<std::shared_ptr<io::InputStream>> MockFileSystem::OpenInputStream( + const std::string& path) { + auto guard = impl_->lock_guard(); + + return impl_->OpenInputReader(path); +} + +Result<std::shared_ptr<io::RandomAccessFile>> MockFileSystem::OpenInputFile( + const std::string& path) { + auto guard = impl_->lock_guard(); + + return impl_->OpenInputReader(path); +} + +Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + auto guard = impl_->lock_guard(); + + return impl_->OpenOutputStream(path, /*append=*/false, metadata); +} + +Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + auto guard = impl_->lock_guard(); + + return impl_->OpenOutputStream(path, /*append=*/true, metadata); +} + +std::vector<MockDirInfo> MockFileSystem::AllDirs() { + auto guard = impl_->lock_guard(); + + std::vector<MockDirInfo> result; + impl_->DumpDirs("", impl_->RootDir(), &result); + return result; +} + +std::vector<MockFileInfo> MockFileSystem::AllFiles() { + auto guard = impl_->lock_guard(); + + std::vector<MockFileInfo> result; + impl_->DumpFiles("", impl_->RootDir(), &result); + return result; +} + +Status MockFileSystem::CreateFile(const std::string& path, util::string_view contents, + bool recursive) { + auto parent = fs::internal::GetAbstractPathParent(path).first; + + if (parent != "") { + RETURN_NOT_OK(CreateDir(parent, recursive)); + } + + ARROW_ASSIGN_OR_RAISE(auto file, OpenOutputStream(path)); + RETURN_NOT_OK(file->Write(contents)); + return file->Close(); +} + +Result<std::shared_ptr<FileSystem>> MockFileSystem::Make( + TimePoint current_time, const std::vector<FileInfo>& infos) { + auto fs = std::make_shared<MockFileSystem>(current_time); + for (const auto& info : infos) { + switch (info.type()) { + case FileType::Directory: + RETURN_NOT_OK(fs->CreateDir(info.path(), /*recursive*/ true)); + break; + case FileType::File: + RETURN_NOT_OK(fs->CreateFile(info.path(), "", /*recursive*/ true)); + break; + default: + break; + } + } + + return fs; +} + +FileInfoGenerator MockAsyncFileSystem::GetFileInfoGenerator(const FileSelector& select) { + auto maybe_infos = GetFileInfo(select); + if (maybe_infos.ok()) { + // Return the FileInfo entries one by one + const auto& infos = *maybe_infos; + std::vector<FileInfoVector> chunks(infos.size()); + std::transform(infos.begin(), infos.end(), chunks.begin(), + [](const FileInfo& info) { return FileInfoVector{info}; }); + return MakeVectorGenerator(std::move(chunks)); + } else { + return MakeFailingGenerator(maybe_infos); + } +} + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h new file mode 100644 index 00000000000..378f30d295d --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/mockfs.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <iosfwd> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/string_view.h" +#include "arrow/util/windows_fixup.h" + +namespace arrow { +namespace fs { +namespace internal { + +struct MockDirInfo { + std::string full_path; + TimePoint mtime; + + bool operator==(const MockDirInfo& other) const { + return mtime == other.mtime && full_path == other.full_path; + } + + friend ARROW_EXPORT std::ostream& operator<<(std::ostream&, const MockDirInfo&); +}; + +struct MockFileInfo { + std::string full_path; + TimePoint mtime; + util::string_view data; + + bool operator==(const MockFileInfo& other) const { + return mtime == other.mtime && full_path == other.full_path && data == other.data; + } + + friend ARROW_EXPORT std::ostream& operator<<(std::ostream&, const MockFileInfo&); +}; + +/// A mock FileSystem implementation that holds its contents in memory. +/// +/// Useful for validating the FileSystem API, writing conformance suite, +/// and bootstrapping FileSystem-based APIs. +class ARROW_EXPORT MockFileSystem : public FileSystem { + public: + explicit MockFileSystem(TimePoint current_time, + const io::IOContext& = io::default_io_context()); + ~MockFileSystem() override; + + std::string type_name() const override { return "mock"; } + + bool Equals(const FileSystem& other) const override; + + // XXX It's not very practical to have to explicitly declare inheritance + // of default overrides. + using FileSystem::GetFileInfo; + Result<FileInfo> GetFileInfo(const std::string& path) override; + Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path) override; + Status DeleteRootDirContents() override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Result<std::shared_ptr<io::InputStream>> OpenInputStream( + const std::string& path) override; + Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( + const std::string& path) override; + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( + const std::string& path, + const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + + // Contents-dumping helpers to ease testing. + // Output is lexicographically-ordered by full path. + std::vector<MockDirInfo> AllDirs(); + std::vector<MockFileInfo> AllFiles(); + + // Create a File with a content from a string. + Status CreateFile(const std::string& path, util::string_view content, + bool recursive = true); + + // Create a MockFileSystem out of (empty) FileInfo. The content of every + // file is empty and of size 0. All directories will be created recursively. + static Result<std::shared_ptr<FileSystem>> Make(TimePoint current_time, + const std::vector<FileInfo>& infos); + + class Impl; + + protected: + std::unique_ptr<Impl> impl_; +}; + +class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem { + public: + explicit MockAsyncFileSystem(TimePoint current_time, + const io::IOContext& io_context = io::default_io_context()) + : MockFileSystem(current_time, io_context) { + default_async_is_sync_ = false; + } + + FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; +}; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc new file mode 100644 index 00000000000..f1bd5c087bf --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.cc @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> + +#include "arrow/filesystem/path_util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" + +namespace arrow { +namespace fs { +namespace internal { + +// XXX How does this encode Windows UNC paths? + +std::vector<std::string> SplitAbstractPath(const std::string& path) { + std::vector<std::string> parts; + auto v = util::string_view(path); + // Strip trailing slash + if (v.length() > 0 && v.back() == kSep) { + v = v.substr(0, v.length() - 1); + } + // Strip leading slash + if (v.length() > 0 && v.front() == kSep) { + v = v.substr(1); + } + if (v.length() == 0) { + return parts; + } + + auto append_part = [&parts, &v](size_t start, size_t end) { + parts.push_back(std::string(v.substr(start, end - start))); + }; + + size_t start = 0; + while (true) { + size_t end = v.find_first_of(kSep, start); + append_part(start, end); + if (end == std::string::npos) { + break; + } + start = end + 1; + } + return parts; +} + +std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) { + // XXX should strip trailing slash? + + auto pos = s.find_last_of(kSep); + if (pos == std::string::npos) { + // Empty parent + return {{}, s}; + } + return {s.substr(0, pos), s.substr(pos + 1)}; +} + +std::string GetAbstractPathExtension(const std::string& s) { + util::string_view basename(s); + auto offset = basename.find_last_of(kSep); + if (offset != std::string::npos) { + basename = basename.substr(offset); + } + auto dot = basename.find_last_of('.'); + if (dot == util::string_view::npos) { + // Empty extension + return ""; + } + return std::string(basename.substr(dot + 1)); +} + +Status ValidateAbstractPathParts(const std::vector<std::string>& parts) { + for (const auto& part : parts) { + if (part.length() == 0) { + return Status::Invalid("Empty path component"); + } + if (part.find_first_of(kSep) != std::string::npos) { + return Status::Invalid("Separator in component '", part, "'"); + } + } + return Status::OK(); +} + +std::string ConcatAbstractPath(const std::string& base, const std::string& stem) { + DCHECK(!stem.empty()); + if (base.empty()) { + return stem; + } + return EnsureTrailingSlash(base) + std::string(RemoveLeadingSlash(stem)); +} + +std::string EnsureTrailingSlash(util::string_view v) { + if (v.length() > 0 && v.back() != kSep) { + // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"... + // Unless the local filesystem always uses absolute paths + return std::string(v) + kSep; + } else { + return std::string(v); + } +} + +std::string EnsureLeadingSlash(util::string_view v) { + if (v.length() == 0 || v.front() != kSep) { + // XXX How about "C:" on Windows? We probably don't want to turn it into "/C:"... + return kSep + std::string(v); + } else { + return std::string(v); + } +} +util::string_view RemoveTrailingSlash(util::string_view key) { + while (!key.empty() && key.back() == kSep) { + key.remove_suffix(1); + } + return key; +} + +util::string_view RemoveLeadingSlash(util::string_view key) { + while (!key.empty() && key.front() == kSep) { + key.remove_prefix(1); + } + return key; +} + +Result<std::string> MakeAbstractPathRelative(const std::string& base, + const std::string& path) { + if (base.empty() || base.front() != kSep) { + return Status::Invalid("MakeAbstractPathRelative called with non-absolute base '", + base, "'"); + } + auto b = EnsureLeadingSlash(RemoveTrailingSlash(base)); + auto p = util::string_view(path); + if (p.substr(0, b.size()) != util::string_view(b)) { + return Status::Invalid("Path '", path, "' is not relative to '", base, "'"); + } + p = p.substr(b.size()); + if (!p.empty() && p.front() != kSep && b.back() != kSep) { + return Status::Invalid("Path '", path, "' is not relative to '", base, "'"); + } + return std::string(RemoveLeadingSlash(p)); +} + +bool IsAncestorOf(util::string_view ancestor, util::string_view descendant) { + ancestor = RemoveTrailingSlash(ancestor); + if (ancestor == "") { + // everything is a descendant of the root directory + return true; + } + + descendant = RemoveTrailingSlash(descendant); + if (!descendant.starts_with(ancestor)) { + // an ancestor path is a prefix of descendant paths + return false; + } + + descendant.remove_prefix(ancestor.size()); + + if (descendant.empty()) { + // "/hello" is an ancestor of "/hello" + return true; + } + + // "/hello/w" is not an ancestor of "/hello/world" + return descendant.starts_with(std::string{kSep}); +} + +util::optional<util::string_view> RemoveAncestor(util::string_view ancestor, + util::string_view descendant) { + if (!IsAncestorOf(ancestor, descendant)) { + return util::nullopt; + } + + auto relative_to_ancestor = descendant.substr(ancestor.size()); + return RemoveLeadingSlash(relative_to_ancestor); +} + +std::vector<std::string> AncestorsFromBasePath(util::string_view base_path, + util::string_view descendant) { + std::vector<std::string> ancestry; + if (auto relative = RemoveAncestor(base_path, descendant)) { + auto relative_segments = fs::internal::SplitAbstractPath(std::string(*relative)); + + // the last segment indicates descendant + relative_segments.pop_back(); + + if (relative_segments.empty()) { + // no missing parent + return {}; + } + + for (auto&& relative_segment : relative_segments) { + ancestry.push_back(JoinAbstractPath( + std::vector<std::string>{std::string(base_path), std::move(relative_segment)})); + base_path = ancestry.back(); + } + } + return ancestry; +} + +std::vector<std::string> MinimalCreateDirSet(std::vector<std::string> dirs) { + std::sort(dirs.begin(), dirs.end()); + + for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) { + auto descendant = ancestor; + auto descendants_end = descendant + 1; + + while (descendants_end != dirs.end() && IsAncestorOf(*descendant, *descendants_end)) { + ++descendant; + ++descendants_end; + } + + ancestor = dirs.erase(ancestor, descendants_end - 1); + } + + // the root directory need not be created + if (dirs.size() == 1 && IsAncestorOf(dirs[0], "")) { + return {}; + } + + return dirs; +} + +std::string ToBackslashes(util::string_view v) { + std::string s(v); + for (auto& c : s) { + if (c == '/') { + c = '\\'; + } + } + return s; +} + +std::string ToSlashes(util::string_view v) { + std::string s(v); +#ifdef _WIN32 + for (auto& c : s) { + if (c == '\\') { + c = '/'; + } + } +#endif + return s; +} + +bool IsEmptyPath(util::string_view v) { + for (const auto c : v) { + if (c != '/') { + return false; + } + } + return true; +} + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h new file mode 100644 index 00000000000..5701c11b5d8 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/path_util.h @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> +#include <utility> +#include <vector> + +#include "arrow/type_fwd.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" + +namespace arrow { +namespace fs { +namespace internal { + +constexpr char kSep = '/'; + +// Computations on abstract paths (not local paths with system-dependent behaviour). +// Abstract paths are typically used in URIs. + +// Split an abstract path into its individual components. +ARROW_EXPORT +std::vector<std::string> SplitAbstractPath(const std::string& s); + +// Return the extension of the file +ARROW_EXPORT +std::string GetAbstractPathExtension(const std::string& s); + +// Return the parent directory and basename of an abstract path. Both values may be +// empty. +ARROW_EXPORT +std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s); + +// Validate the components of an abstract path. +ARROW_EXPORT +Status ValidateAbstractPathParts(const std::vector<std::string>& parts); + +// Append a non-empty stem to an abstract path. +ARROW_EXPORT +std::string ConcatAbstractPath(const std::string& base, const std::string& stem); + +// Make path relative to base, if it starts with base. Otherwise error out. +ARROW_EXPORT +Result<std::string> MakeAbstractPathRelative(const std::string& base, + const std::string& path); + +ARROW_EXPORT +std::string EnsureLeadingSlash(util::string_view s); + +ARROW_EXPORT +util::string_view RemoveLeadingSlash(util::string_view s); + +ARROW_EXPORT +std::string EnsureTrailingSlash(util::string_view s); + +ARROW_EXPORT +util::string_view RemoveTrailingSlash(util::string_view s); + +ARROW_EXPORT +bool IsAncestorOf(util::string_view ancestor, util::string_view descendant); + +ARROW_EXPORT +util::optional<util::string_view> RemoveAncestor(util::string_view ancestor, + util::string_view descendant); + +/// Return a vector of ancestors between a base path and a descendant. +/// For example, +/// +/// AncestorsFromBasePath("a/b", "a/b/c/d/e") -> ["a/b/c", "a/b/c/d"] +ARROW_EXPORT +std::vector<std::string> AncestorsFromBasePath(util::string_view base_path, + util::string_view descendant); + +/// Given a vector of paths of directories which must be created, produce a the minimal +/// subset for passing to CreateDir(recursive=true) by removing redundant parent +/// directories +ARROW_EXPORT +std::vector<std::string> MinimalCreateDirSet(std::vector<std::string> dirs); + +// Join the components of an abstract path. +template <class StringIt> +std::string JoinAbstractPath(StringIt it, StringIt end) { + std::string path; + for (; it != end; ++it) { + if (it->empty()) continue; + + if (!path.empty()) { + path += kSep; + } + path += *it; + } + return path; +} + +template <class StringRange> +std::string JoinAbstractPath(const StringRange& range) { + return JoinAbstractPath(range.begin(), range.end()); +} + +/// Convert slashes to backslashes, on all platforms. Mostly useful for testing. +ARROW_EXPORT +std::string ToBackslashes(util::string_view s); + +/// Ensure a local path is abstract, by converting backslashes to regular slashes +/// on Windows. Return the path unchanged on other systems. +ARROW_EXPORT +std::string ToSlashes(util::string_view s); + +ARROW_EXPORT +bool IsEmptyPath(util::string_view s); + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h new file mode 100644 index 00000000000..112563577db --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/type_fwd.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace arrow { +namespace fs { + +/// \brief FileSystem entry type +enum class FileType : int8_t { + /// Entry is not found + NotFound, + /// Entry exists but its type is unknown + /// + /// This can designate a special file such as a Unix socket or character + /// device, or Windows NUL / CON / ... + Unknown, + /// Entry is a regular file + File, + /// Entry is a directory + Directory +}; + +struct FileInfo; + +struct FileSelector; + +class FileSystem; +class SubTreeFileSystem; +class SlowFileSystem; +class LocalFileSystem; +class S3FileSystem; + +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc new file mode 100644 index 00000000000..8f86707375d --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/util_internal.h" +#include "arrow/buffer.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace fs { +namespace internal { + +TimePoint CurrentTimePoint() { + auto now = std::chrono::system_clock::now(); + return TimePoint( + std::chrono::duration_cast<TimePoint::duration>(now.time_since_epoch())); +} + +Status CopyStream(const std::shared_ptr<io::InputStream>& src, + const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size, + const io::IOContext& io_context) { + ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size, io_context.pool())); + + while (true) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + src->Read(chunk_size, chunk->mutable_data())); + if (bytes_read == 0) { + // EOF + break; + } + RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read)); + } + + return Status::OK(); +} + +Status PathNotFound(const std::string& path) { + return Status::IOError("Path does not exist '", path, "'"); +} + +Status NotADir(const std::string& path) { + return Status::IOError("Not a directory: '", path, "'"); +} + +Status NotAFile(const std::string& path) { + return Status::IOError("Not a regular file: '", path, "'"); +} + +Status InvalidDeleteDirContents(const std::string& path) { + return Status::Invalid( + "DeleteDirContents called on invalid path '", path, "'. ", + "If you wish to delete the root directory's contents, call DeleteRootDirContents."); +} + +FileSystemGlobalOptions global_options; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h new file mode 100644 index 00000000000..915c8d03d46 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/util_internal.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <memory> + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/interfaces.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace fs { +namespace internal { + +ARROW_EXPORT +TimePoint CurrentTimePoint(); + +ARROW_EXPORT +Status CopyStream(const std::shared_ptr<io::InputStream>& src, + const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size, + const io::IOContext& io_context); + +ARROW_EXPORT +Status PathNotFound(const std::string& path); + +ARROW_EXPORT +Status NotADir(const std::string& path); + +ARROW_EXPORT +Status NotAFile(const std::string& path); + +ARROW_EXPORT +Status InvalidDeleteDirContents(const std::string& path); + +extern FileSystemGlobalOptions global_options; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc b/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc new file mode 100644 index 00000000000..117b82df30d --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/ipc/json_simple.cc @@ -0,0 +1,940 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <sstream> +#include <type_traits> +#include <utility> +#include <vector> + +#include "arrow/array/array_dict.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_decimal.h" +#include "arrow/array/builder_dict.h" +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_time.h" +#include "arrow/array/builder_union.h" +#include "arrow/ipc/json_simple.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" +#include "arrow/util/value_parsing.h" + +#include "arrow/json/rapidjson_defs.h" + +#include <rapidjson/document.h> +#include <rapidjson/error/en.h> +#include <rapidjson/rapidjson.h> +#include <rapidjson/reader.h> +#include <rapidjson/writer.h> + +namespace rj = arrow::rapidjson; + +namespace arrow { + +using internal::ParseValue; + +namespace ipc { +namespace internal { +namespace json { + +using ::arrow::internal::checked_cast; +using ::arrow::internal::checked_pointer_cast; + +namespace { + +constexpr auto kParseFlags = rj::kParseFullPrecisionFlag | rj::kParseNanAndInfFlag; + +Status JSONTypeError(const char* expected_type, rj::Type json_type) { + return Status::Invalid("Expected ", expected_type, " or null, got JSON type ", + json_type); +} + +class Converter { + public: + virtual ~Converter() = default; + + virtual Status Init() { return Status::OK(); } + + virtual Status AppendValue(const rj::Value& json_obj) = 0; + + Status AppendNull() { return this->builder()->AppendNull(); } + + virtual Status AppendValues(const rj::Value& json_array) = 0; + + virtual std::shared_ptr<ArrayBuilder> builder() = 0; + + virtual Status Finish(std::shared_ptr<Array>* out) { + auto builder = this->builder(); + if (builder->length() == 0) { + // Make sure the builder was initialized + RETURN_NOT_OK(builder->Resize(1)); + } + return builder->Finish(out); + } + + protected: + std::shared_ptr<DataType> type_; +}; + +Status GetConverter(const std::shared_ptr<DataType>&, std::shared_ptr<Converter>* out); + +// CRTP +template <class Derived> +class ConcreteConverter : public Converter { + public: + Status AppendValues(const rj::Value& json_array) override { + auto self = static_cast<Derived*>(this); + if (!json_array.IsArray()) { + return JSONTypeError("array", json_array.GetType()); + } + auto size = json_array.Size(); + for (uint32_t i = 0; i < size; ++i) { + RETURN_NOT_OK(self->AppendValue(json_array[i])); + } + return Status::OK(); + } + + const std::shared_ptr<DataType>& value_type() { + if (type_->id() != Type::DICTIONARY) { + return type_; + } + return checked_cast<const DictionaryType&>(*type_).value_type(); + } + + template <typename BuilderType> + Status MakeConcreteBuilder(std::shared_ptr<BuilderType>* out) { + std::unique_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(default_memory_pool(), this->type_, &builder)); + *out = checked_pointer_cast<BuilderType>(std::move(builder)); + DCHECK(*out); + return Status::OK(); + } +}; + +// ------------------------------------------------------------------------ +// Converter for null arrays + +class NullConverter final : public ConcreteConverter<NullConverter> { + public: + explicit NullConverter(const std::shared_ptr<DataType>& type) { + type_ = type; + builder_ = std::make_shared<NullBuilder>(); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return AppendNull(); + } + return JSONTypeError("null", json_obj.GetType()); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<NullBuilder> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for boolean arrays + +class BooleanConverter final : public ConcreteConverter<BooleanConverter> { + public: + explicit BooleanConverter(const std::shared_ptr<DataType>& type) { + type_ = type; + builder_ = std::make_shared<BooleanBuilder>(); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return AppendNull(); + } + if (json_obj.IsBool()) { + return builder_->Append(json_obj.GetBool()); + } + if (json_obj.IsInt()) { + return builder_->Append(json_obj.GetInt() != 0); + } + return JSONTypeError("boolean", json_obj.GetType()); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BooleanBuilder> builder_; +}; + +// ------------------------------------------------------------------------ +// Helpers for numeric converters + +// Convert single signed integer value (also {Date,Time}{32,64} and Timestamp) +template <typename T> +enable_if_physical_signed_integer<T, Status> ConvertNumber(const rj::Value& json_obj, + const DataType& type, + typename T::c_type* out) { + if (json_obj.IsInt64()) { + int64_t v64 = json_obj.GetInt64(); + *out = static_cast<typename T::c_type>(v64); + if (*out == v64) { + return Status::OK(); + } else { + return Status::Invalid("Value ", v64, " out of bounds for ", type); + } + } else { + *out = static_cast<typename T::c_type>(0); + return JSONTypeError("signed int", json_obj.GetType()); + } +} + +// Convert single unsigned integer value +template <typename T> +enable_if_physical_unsigned_integer<T, Status> ConvertNumber(const rj::Value& json_obj, + const DataType& type, + typename T::c_type* out) { + if (json_obj.IsUint64()) { + uint64_t v64 = json_obj.GetUint64(); + *out = static_cast<typename T::c_type>(v64); + if (*out == v64) { + return Status::OK(); + } else { + return Status::Invalid("Value ", v64, " out of bounds for ", type); + } + } else { + *out = static_cast<typename T::c_type>(0); + return JSONTypeError("unsigned int", json_obj.GetType()); + } +} + +// Convert single floating point value +template <typename T> +enable_if_physical_floating_point<T, Status> ConvertNumber(const rj::Value& json_obj, + const DataType& type, + typename T::c_type* out) { + if (json_obj.IsNumber()) { + *out = static_cast<typename T::c_type>(json_obj.GetDouble()); + return Status::OK(); + } else { + *out = static_cast<typename T::c_type>(0); + return JSONTypeError("number", json_obj.GetType()); + } +} + +// ------------------------------------------------------------------------ +// Converter for int arrays + +template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType> +class IntegerConverter final + : public ConcreteConverter<IntegerConverter<Type, BuilderType>> { + using c_type = typename Type::c_type; + + static constexpr auto is_signed = std::is_signed<c_type>::value; + + public: + explicit IntegerConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; } + + Status Init() override { return this->MakeConcreteBuilder(&builder_); } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + c_type value; + RETURN_NOT_OK(ConvertNumber<Type>(json_obj, *this->type_, &value)); + return builder_->Append(value); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for float arrays + +template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType> +class FloatConverter final : public ConcreteConverter<FloatConverter<Type, BuilderType>> { + using c_type = typename Type::c_type; + + public: + explicit FloatConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; } + + Status Init() override { return this->MakeConcreteBuilder(&builder_); } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + c_type value; + RETURN_NOT_OK(ConvertNumber<Type>(json_obj, *this->type_, &value)); + return builder_->Append(value); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for decimal arrays + +template <typename DecimalSubtype, typename DecimalValue, typename BuilderType> +class DecimalConverter final + : public ConcreteConverter< + DecimalConverter<DecimalSubtype, DecimalValue, BuilderType>> { + public: + explicit DecimalConverter(const std::shared_ptr<DataType>& type) { + this->type_ = type; + decimal_type_ = &checked_cast<const DecimalSubtype&>(*this->value_type()); + } + + Status Init() override { return this->MakeConcreteBuilder(&builder_); } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + if (json_obj.IsString()) { + int32_t precision, scale; + DecimalValue d; + auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength()); + RETURN_NOT_OK(DecimalValue::FromString(view, &d, &precision, &scale)); + if (scale != decimal_type_->scale()) { + return Status::Invalid("Invalid scale for decimal: expected ", + decimal_type_->scale(), ", got ", scale); + } + return builder_->Append(d); + } + return JSONTypeError("decimal string", json_obj.GetType()); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; + const DecimalSubtype* decimal_type_; +}; + +template <typename BuilderType = typename TypeTraits<Decimal128Type>::BuilderType> +using Decimal128Converter = DecimalConverter<Decimal128Type, Decimal128, BuilderType>; +template <typename BuilderType = typename TypeTraits<Decimal256Type>::BuilderType> +using Decimal256Converter = DecimalConverter<Decimal256Type, Decimal256, BuilderType>; + +// ------------------------------------------------------------------------ +// Converter for timestamp arrays + +class TimestampConverter final : public ConcreteConverter<TimestampConverter> { + public: + explicit TimestampConverter(const std::shared_ptr<DataType>& type) + : timestamp_type_{checked_cast<const TimestampType*>(type.get())} { + this->type_ = type; + builder_ = std::make_shared<TimestampBuilder>(type, default_memory_pool()); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + int64_t value; + if (json_obj.IsNumber()) { + RETURN_NOT_OK(ConvertNumber<Int64Type>(json_obj, *this->type_, &value)); + } else if (json_obj.IsString()) { + util::string_view view(json_obj.GetString(), json_obj.GetStringLength()); + if (!ParseValue(*timestamp_type_, view.data(), view.size(), &value)) { + return Status::Invalid("couldn't parse timestamp from ", view); + } + } else { + return JSONTypeError("timestamp", json_obj.GetType()); + } + return builder_->Append(value); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + const TimestampType* timestamp_type_; + std::shared_ptr<TimestampBuilder> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for day-time interval arrays + +class DayTimeIntervalConverter final + : public ConcreteConverter<DayTimeIntervalConverter> { + public: + explicit DayTimeIntervalConverter(const std::shared_ptr<DataType>& type) { + this->type_ = type; + builder_ = std::make_shared<DayTimeIntervalBuilder>(default_memory_pool()); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + DayTimeIntervalType::DayMilliseconds value; + if (!json_obj.IsArray()) { + return JSONTypeError("array", json_obj.GetType()); + } + if (json_obj.Size() != 2) { + return Status::Invalid( + "day time interval pair must have exactly two elements, had ", json_obj.Size()); + } + RETURN_NOT_OK(ConvertNumber<Int32Type>(json_obj[0], *this->type_, &value.days)); + RETURN_NOT_OK( + ConvertNumber<Int32Type>(json_obj[1], *this->type_, &value.milliseconds)); + return builder_->Append(value); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<DayTimeIntervalBuilder> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for binary and string arrays + +template <typename Type, typename BuilderType = typename TypeTraits<Type>::BuilderType> +class StringConverter final + : public ConcreteConverter<StringConverter<Type, BuilderType>> { + public: + explicit StringConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; } + + Status Init() override { return this->MakeConcreteBuilder(&builder_); } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + if (json_obj.IsString()) { + auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength()); + return builder_->Append(view); + } else { + return JSONTypeError("string", json_obj.GetType()); + } + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for fixed-size binary arrays + +template <typename BuilderType = typename TypeTraits<FixedSizeBinaryType>::BuilderType> +class FixedSizeBinaryConverter final + : public ConcreteConverter<FixedSizeBinaryConverter<BuilderType>> { + public: + explicit FixedSizeBinaryConverter(const std::shared_ptr<DataType>& type) { + this->type_ = type; + } + + Status Init() override { return this->MakeConcreteBuilder(&builder_); } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + if (json_obj.IsString()) { + auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength()); + if (view.length() != static_cast<size_t>(builder_->byte_width())) { + std::stringstream ss; + ss << "Invalid string length " << view.length() << " in JSON input for " + << this->type_->ToString(); + return Status::Invalid(ss.str()); + } + return builder_->Append(view); + } else { + return JSONTypeError("string", json_obj.GetType()); + } + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; +}; + +// ------------------------------------------------------------------------ +// Converter for list arrays + +template <typename TYPE> +class ListConverter final : public ConcreteConverter<ListConverter<TYPE>> { + public: + using BuilderType = typename TypeTraits<TYPE>::BuilderType; + + explicit ListConverter(const std::shared_ptr<DataType>& type) { this->type_ = type; } + + Status Init() override { + const auto& list_type = checked_cast<const TYPE&>(*this->type_); + RETURN_NOT_OK(GetConverter(list_type.value_type(), &child_converter_)); + auto child_builder = child_converter_->builder(); + builder_ = + std::make_shared<BuilderType>(default_memory_pool(), child_builder, this->type_); + return Status::OK(); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + RETURN_NOT_OK(builder_->Append()); + // Extend the child converter with this JSON array + return child_converter_->AppendValues(json_obj); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<BuilderType> builder_; + std::shared_ptr<Converter> child_converter_; +}; + +// ------------------------------------------------------------------------ +// Converter for map arrays + +class MapConverter final : public ConcreteConverter<MapConverter> { + public: + explicit MapConverter(const std::shared_ptr<DataType>& type) { type_ = type; } + + Status Init() override { + const auto& map_type = checked_cast<const MapType&>(*type_); + RETURN_NOT_OK(GetConverter(map_type.key_type(), &key_converter_)); + RETURN_NOT_OK(GetConverter(map_type.item_type(), &item_converter_)); + auto key_builder = key_converter_->builder(); + auto item_builder = item_converter_->builder(); + builder_ = std::make_shared<MapBuilder>(default_memory_pool(), key_builder, + item_builder, type_); + return Status::OK(); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + RETURN_NOT_OK(builder_->Append()); + if (!json_obj.IsArray()) { + return JSONTypeError("array", json_obj.GetType()); + } + auto size = json_obj.Size(); + for (uint32_t i = 0; i < size; ++i) { + const auto& json_pair = json_obj[i]; + if (!json_pair.IsArray()) { + return JSONTypeError("array", json_pair.GetType()); + } + if (json_pair.Size() != 2) { + return Status::Invalid("key item pair must have exactly two elements, had ", + json_pair.Size()); + } + if (json_pair[0].IsNull()) { + return Status::Invalid("null key is invalid"); + } + RETURN_NOT_OK(key_converter_->AppendValue(json_pair[0])); + RETURN_NOT_OK(item_converter_->AppendValue(json_pair[1])); + } + return Status::OK(); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<MapBuilder> builder_; + std::shared_ptr<Converter> key_converter_, item_converter_; +}; + +// ------------------------------------------------------------------------ +// Converter for fixed size list arrays + +class FixedSizeListConverter final : public ConcreteConverter<FixedSizeListConverter> { + public: + explicit FixedSizeListConverter(const std::shared_ptr<DataType>& type) { type_ = type; } + + Status Init() override { + const auto& list_type = checked_cast<const FixedSizeListType&>(*type_); + list_size_ = list_type.list_size(); + RETURN_NOT_OK(GetConverter(list_type.value_type(), &child_converter_)); + auto child_builder = child_converter_->builder(); + builder_ = std::make_shared<FixedSizeListBuilder>(default_memory_pool(), + child_builder, type_); + return Status::OK(); + } + + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + RETURN_NOT_OK(builder_->Append()); + // Extend the child converter with this JSON array + RETURN_NOT_OK(child_converter_->AppendValues(json_obj)); + if (json_obj.GetArray().Size() != static_cast<rj::SizeType>(list_size_)) { + return Status::Invalid("incorrect list size ", json_obj.GetArray().Size()); + } + return Status::OK(); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + int32_t list_size_; + std::shared_ptr<FixedSizeListBuilder> builder_; + std::shared_ptr<Converter> child_converter_; +}; + +// ------------------------------------------------------------------------ +// Converter for struct arrays + +class StructConverter final : public ConcreteConverter<StructConverter> { + public: + explicit StructConverter(const std::shared_ptr<DataType>& type) { type_ = type; } + + Status Init() override { + std::vector<std::shared_ptr<ArrayBuilder>> child_builders; + for (const auto& field : type_->fields()) { + std::shared_ptr<Converter> child_converter; + RETURN_NOT_OK(GetConverter(field->type(), &child_converter)); + child_converters_.push_back(child_converter); + child_builders.push_back(child_converter->builder()); + } + builder_ = std::make_shared<StructBuilder>(type_, default_memory_pool(), + std::move(child_builders)); + return Status::OK(); + } + + // Append a JSON value that is either an array of N elements in order + // or an object mapping struct names to values (omitted struct members + // are mapped to null). + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + if (json_obj.IsArray()) { + auto size = json_obj.Size(); + auto expected_size = static_cast<uint32_t>(type_->num_fields()); + if (size != expected_size) { + return Status::Invalid("Expected array of size ", expected_size, + ", got array of size ", size); + } + for (uint32_t i = 0; i < size; ++i) { + RETURN_NOT_OK(child_converters_[i]->AppendValue(json_obj[i])); + } + return builder_->Append(); + } + if (json_obj.IsObject()) { + auto remaining = json_obj.MemberCount(); + auto num_children = type_->num_fields(); + for (int32_t i = 0; i < num_children; ++i) { + const auto& field = type_->field(i); + auto it = json_obj.FindMember(field->name()); + if (it != json_obj.MemberEnd()) { + --remaining; + RETURN_NOT_OK(child_converters_[i]->AppendValue(it->value)); + } else { + RETURN_NOT_OK(child_converters_[i]->AppendNull()); + } + } + if (remaining > 0) { + rj::StringBuffer sb; + rj::Writer<rj::StringBuffer> writer(sb); + json_obj.Accept(writer); + return Status::Invalid("Unexpected members in JSON object for type ", + type_->ToString(), " Object: ", sb.GetString()); + } + return builder_->Append(); + } + return JSONTypeError("array or object", json_obj.GetType()); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + std::shared_ptr<StructBuilder> builder_; + std::vector<std::shared_ptr<Converter>> child_converters_; +}; + +// ------------------------------------------------------------------------ +// Converter for union arrays + +class UnionConverter final : public ConcreteConverter<UnionConverter> { + public: + explicit UnionConverter(const std::shared_ptr<DataType>& type) { type_ = type; } + + Status Init() override { + auto union_type = checked_cast<const UnionType*>(type_.get()); + mode_ = union_type->mode(); + type_id_to_child_num_.clear(); + type_id_to_child_num_.resize(union_type->max_type_code() + 1, -1); + int child_i = 0; + for (auto type_id : union_type->type_codes()) { + type_id_to_child_num_[type_id] = child_i++; + } + std::vector<std::shared_ptr<ArrayBuilder>> child_builders; + for (const auto& field : type_->fields()) { + std::shared_ptr<Converter> child_converter; + RETURN_NOT_OK(GetConverter(field->type(), &child_converter)); + child_converters_.push_back(child_converter); + child_builders.push_back(child_converter->builder()); + } + if (mode_ == UnionMode::DENSE) { + builder_ = std::make_shared<DenseUnionBuilder>(default_memory_pool(), + std::move(child_builders), type_); + } else { + builder_ = std::make_shared<SparseUnionBuilder>(default_memory_pool(), + std::move(child_builders), type_); + } + return Status::OK(); + } + + // Append a JSON value that must be a 2-long array, containing the type_id + // and value of the UnionArray's slot. + Status AppendValue(const rj::Value& json_obj) override { + if (json_obj.IsNull()) { + return this->AppendNull(); + } + if (!json_obj.IsArray()) { + return JSONTypeError("array", json_obj.GetType()); + } + if (json_obj.Size() != 2) { + return Status::Invalid("Expected [type_id, value] pair, got array of size ", + json_obj.Size()); + } + const auto& id_obj = json_obj[0]; + if (!id_obj.IsInt()) { + return JSONTypeError("int", id_obj.GetType()); + } + + auto id = static_cast<int8_t>(id_obj.GetInt()); + auto child_num = type_id_to_child_num_[id]; + if (child_num == -1) { + return Status::Invalid("type_id ", id, " not found in ", *type_); + } + + auto child_converter = child_converters_[child_num]; + if (mode_ == UnionMode::SPARSE) { + RETURN_NOT_OK(checked_cast<SparseUnionBuilder&>(*builder_).Append(id)); + for (auto&& other_converter : child_converters_) { + if (other_converter != child_converter) { + RETURN_NOT_OK(other_converter->AppendNull()); + } + } + } else { + RETURN_NOT_OK(checked_cast<DenseUnionBuilder&>(*builder_).Append(id)); + } + return child_converter->AppendValue(json_obj[1]); + } + + std::shared_ptr<ArrayBuilder> builder() override { return builder_; } + + private: + UnionMode::type mode_; + std::shared_ptr<ArrayBuilder> builder_; + std::vector<std::shared_ptr<Converter>> child_converters_; + std::vector<int8_t> type_id_to_child_num_; +}; + +// ------------------------------------------------------------------------ +// General conversion functions + +Status ConversionNotImplemented(const std::shared_ptr<DataType>& type) { + return Status::NotImplemented("JSON conversion to ", type->ToString(), + " not implemented"); +} + +Status GetDictConverter(const std::shared_ptr<DataType>& type, + std::shared_ptr<Converter>* out) { + std::shared_ptr<Converter> res; + + const auto value_type = checked_cast<const DictionaryType&>(*type).value_type(); + +#define SIMPLE_CONVERTER_CASE(ID, CLASS, TYPE) \ + case ID: \ + res = std::make_shared<CLASS<DictionaryBuilder<TYPE>>>(type); \ + break; + +#define PARAM_CONVERTER_CASE(ID, CLASS, TYPE) \ + case ID: \ + res = std::make_shared<CLASS<TYPE, DictionaryBuilder<TYPE>>>(type); \ + break; + + switch (value_type->id()) { + PARAM_CONVERTER_CASE(Type::INT8, IntegerConverter, Int8Type) + PARAM_CONVERTER_CASE(Type::INT16, IntegerConverter, Int16Type) + PARAM_CONVERTER_CASE(Type::INT32, IntegerConverter, Int32Type) + PARAM_CONVERTER_CASE(Type::INT64, IntegerConverter, Int64Type) + PARAM_CONVERTER_CASE(Type::UINT8, IntegerConverter, UInt8Type) + PARAM_CONVERTER_CASE(Type::UINT16, IntegerConverter, UInt16Type) + PARAM_CONVERTER_CASE(Type::UINT32, IntegerConverter, UInt32Type) + PARAM_CONVERTER_CASE(Type::UINT64, IntegerConverter, UInt64Type) + PARAM_CONVERTER_CASE(Type::FLOAT, FloatConverter, FloatType) + PARAM_CONVERTER_CASE(Type::DOUBLE, FloatConverter, DoubleType) + PARAM_CONVERTER_CASE(Type::STRING, StringConverter, StringType) + PARAM_CONVERTER_CASE(Type::BINARY, StringConverter, BinaryType) + PARAM_CONVERTER_CASE(Type::LARGE_STRING, StringConverter, LargeStringType) + PARAM_CONVERTER_CASE(Type::LARGE_BINARY, StringConverter, LargeBinaryType) + SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_BINARY, FixedSizeBinaryConverter, + FixedSizeBinaryType) + SIMPLE_CONVERTER_CASE(Type::DECIMAL128, Decimal128Converter, Decimal128Type) + SIMPLE_CONVERTER_CASE(Type::DECIMAL256, Decimal256Converter, Decimal256Type) + default: + return ConversionNotImplemented(type); + } + +#undef SIMPLE_CONVERTER_CASE +#undef PARAM_CONVERTER_CASE + + RETURN_NOT_OK(res->Init()); + *out = res; + return Status::OK(); +} + +Status GetConverter(const std::shared_ptr<DataType>& type, + std::shared_ptr<Converter>* out) { + if (type->id() == Type::DICTIONARY) { + return GetDictConverter(type, out); + } + + std::shared_ptr<Converter> res; + +#define SIMPLE_CONVERTER_CASE(ID, CLASS) \ + case ID: \ + res = std::make_shared<CLASS>(type); \ + break; + + switch (type->id()) { + SIMPLE_CONVERTER_CASE(Type::INT8, IntegerConverter<Int8Type>) + SIMPLE_CONVERTER_CASE(Type::INT16, IntegerConverter<Int16Type>) + SIMPLE_CONVERTER_CASE(Type::INT32, IntegerConverter<Int32Type>) + SIMPLE_CONVERTER_CASE(Type::INT64, IntegerConverter<Int64Type>) + SIMPLE_CONVERTER_CASE(Type::UINT8, IntegerConverter<UInt8Type>) + SIMPLE_CONVERTER_CASE(Type::UINT16, IntegerConverter<UInt16Type>) + SIMPLE_CONVERTER_CASE(Type::UINT32, IntegerConverter<UInt32Type>) + SIMPLE_CONVERTER_CASE(Type::UINT64, IntegerConverter<UInt64Type>) + SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, TimestampConverter) + SIMPLE_CONVERTER_CASE(Type::DATE32, IntegerConverter<Date32Type>) + SIMPLE_CONVERTER_CASE(Type::DATE64, IntegerConverter<Date64Type>) + SIMPLE_CONVERTER_CASE(Type::TIME32, IntegerConverter<Time32Type>) + SIMPLE_CONVERTER_CASE(Type::TIME64, IntegerConverter<Time64Type>) + SIMPLE_CONVERTER_CASE(Type::DURATION, IntegerConverter<DurationType>) + SIMPLE_CONVERTER_CASE(Type::NA, NullConverter) + SIMPLE_CONVERTER_CASE(Type::BOOL, BooleanConverter) + SIMPLE_CONVERTER_CASE(Type::HALF_FLOAT, IntegerConverter<HalfFloatType>) + SIMPLE_CONVERTER_CASE(Type::FLOAT, FloatConverter<FloatType>) + SIMPLE_CONVERTER_CASE(Type::DOUBLE, FloatConverter<DoubleType>) + SIMPLE_CONVERTER_CASE(Type::LIST, ListConverter<ListType>) + SIMPLE_CONVERTER_CASE(Type::LARGE_LIST, ListConverter<LargeListType>) + SIMPLE_CONVERTER_CASE(Type::MAP, MapConverter) + SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_LIST, FixedSizeListConverter) + SIMPLE_CONVERTER_CASE(Type::STRUCT, StructConverter) + SIMPLE_CONVERTER_CASE(Type::STRING, StringConverter<StringType>) + SIMPLE_CONVERTER_CASE(Type::BINARY, StringConverter<BinaryType>) + SIMPLE_CONVERTER_CASE(Type::LARGE_STRING, StringConverter<LargeStringType>) + SIMPLE_CONVERTER_CASE(Type::LARGE_BINARY, StringConverter<LargeBinaryType>) + SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_BINARY, FixedSizeBinaryConverter<>) + SIMPLE_CONVERTER_CASE(Type::DECIMAL128, Decimal128Converter<>) + SIMPLE_CONVERTER_CASE(Type::DECIMAL256, Decimal256Converter<>) + SIMPLE_CONVERTER_CASE(Type::SPARSE_UNION, UnionConverter) + SIMPLE_CONVERTER_CASE(Type::DENSE_UNION, UnionConverter) + SIMPLE_CONVERTER_CASE(Type::INTERVAL_MONTHS, IntegerConverter<MonthIntervalType>) + SIMPLE_CONVERTER_CASE(Type::INTERVAL_DAY_TIME, DayTimeIntervalConverter) + default: + return ConversionNotImplemented(type); + } + +#undef SIMPLE_CONVERTER_CASE + + RETURN_NOT_OK(res->Init()); + *out = res; + return Status::OK(); +} + +} // namespace + +Status ArrayFromJSON(const std::shared_ptr<DataType>& type, util::string_view json_string, + std::shared_ptr<Array>* out) { + std::shared_ptr<Converter> converter; + RETURN_NOT_OK(GetConverter(type, &converter)); + + rj::Document json_doc; + json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); + if (json_doc.HasParseError()) { + return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ", + GetParseError_En(json_doc.GetParseError())); + } + + // The JSON document should be an array, append it + RETURN_NOT_OK(converter->AppendValues(json_doc)); + return converter->Finish(out); +} + +Status ArrayFromJSON(const std::shared_ptr<DataType>& type, + const std::string& json_string, std::shared_ptr<Array>* out) { + return ArrayFromJSON(type, util::string_view(json_string), out); +} + +Status ArrayFromJSON(const std::shared_ptr<DataType>& type, const char* json_string, + std::shared_ptr<Array>* out) { + return ArrayFromJSON(type, util::string_view(json_string), out); +} + +Status DictArrayFromJSON(const std::shared_ptr<DataType>& type, + util::string_view indices_json, + util::string_view dictionary_json, std::shared_ptr<Array>* out) { + if (type->id() != Type::DICTIONARY) { + return Status::TypeError("DictArrayFromJSON requires dictionary type, got ", *type); + } + + const auto& dictionary_type = checked_cast<const DictionaryType&>(*type); + + std::shared_ptr<Array> indices, dictionary; + RETURN_NOT_OK(ArrayFromJSON(dictionary_type.index_type(), indices_json, &indices)); + RETURN_NOT_OK( + ArrayFromJSON(dictionary_type.value_type(), dictionary_json, &dictionary)); + + return DictionaryArray::FromArrays(type, std::move(indices), std::move(dictionary)) + .Value(out); +} + +Status ScalarFromJSON(const std::shared_ptr<DataType>& type, + util::string_view json_string, std::shared_ptr<Scalar>* out) { + std::shared_ptr<Converter> converter; + RETURN_NOT_OK(GetConverter(type, &converter)); + + rj::Document json_doc; + json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); + if (json_doc.HasParseError()) { + return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ", + GetParseError_En(json_doc.GetParseError())); + } + + std::shared_ptr<Array> array; + RETURN_NOT_OK(converter->AppendValue(json_doc)); + RETURN_NOT_OK(converter->Finish(&array)); + DCHECK_EQ(array->length(), 1); + ARROW_ASSIGN_OR_RAISE(*out, array->GetScalar(0)); + return Status::OK(); +} + +} // namespace json +} // namespace internal +} // namespace ipc +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc new file mode 100644 index 00000000000..040009c764f --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.cc @@ -0,0 +1,469 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/chunked_builder.h" + +#include <mutex> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/json/converter.h" +#include "arrow/table.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/task_group.h" + +namespace arrow { + +using internal::checked_cast; +using internal::TaskGroup; + +namespace json { + +class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder { + public: + NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, + std::shared_ptr<Converter> converter) + : ChunkedArrayBuilder(task_group), converter_(std::move(converter)) {} + + Status Finish(std::shared_ptr<ChunkedArray>* out) override { + RETURN_NOT_OK(task_group_->Finish()); + *out = std::make_shared<ChunkedArray>(std::move(chunks_), converter_->out_type()); + chunks_.clear(); + return Status::OK(); + } + + Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override { + RETURN_NOT_OK(task_group_->Finish()); + task_group_ = task_group; + return Status::OK(); + } + + protected: + ArrayVector chunks_; + std::mutex mutex_; + std::shared_ptr<Converter> converter_; +}; + +class TypedChunkedArrayBuilder + : public NonNestedChunkedArrayBuilder, + public std::enable_shared_from_this<TypedChunkedArrayBuilder> { + public: + using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder; + + void Insert(int64_t block_index, const std::shared_ptr<Field>&, + const std::shared_ptr<Array>& unconverted) override { + std::unique_lock<std::mutex> lock(mutex_); + if (chunks_.size() <= static_cast<size_t>(block_index)) { + chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr); + } + lock.unlock(); + + auto self = shared_from_this(); + + task_group_->Append([self, block_index, unconverted] { + std::shared_ptr<Array> converted; + RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted)); + std::unique_lock<std::mutex> lock(self->mutex_); + self->chunks_[block_index] = std::move(converted); + return Status::OK(); + }); + } +}; + +class InferringChunkedArrayBuilder + : public NonNestedChunkedArrayBuilder, + public std::enable_shared_from_this<InferringChunkedArrayBuilder> { + public: + InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, + const PromotionGraph* promotion_graph, + std::shared_ptr<Converter> converter) + : NonNestedChunkedArrayBuilder(task_group, std::move(converter)), + promotion_graph_(promotion_graph) {} + + void Insert(int64_t block_index, const std::shared_ptr<Field>& unconverted_field, + const std::shared_ptr<Array>& unconverted) override { + std::unique_lock<std::mutex> lock(mutex_); + if (chunks_.size() <= static_cast<size_t>(block_index)) { + chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr); + unconverted_.resize(chunks_.size(), nullptr); + unconverted_fields_.resize(chunks_.size(), nullptr); + } + unconverted_[block_index] = unconverted; + unconverted_fields_[block_index] = unconverted_field; + lock.unlock(); + ScheduleConvertChunk(block_index); + } + + void ScheduleConvertChunk(int64_t block_index) { + auto self = shared_from_this(); + task_group_->Append([self, block_index] { + return self->TryConvertChunk(static_cast<size_t>(block_index)); + }); + } + + Status TryConvertChunk(size_t block_index) { + std::unique_lock<std::mutex> lock(mutex_); + auto converter = converter_; + auto unconverted = unconverted_[block_index]; + auto unconverted_field = unconverted_fields_[block_index]; + std::shared_ptr<Array> converted; + + lock.unlock(); + Status st = converter->Convert(unconverted, &converted); + lock.lock(); + + if (converter != converter_) { + // another task promoted converter; reconvert + lock.unlock(); + ScheduleConvertChunk(block_index); + return Status::OK(); + } + + if (st.ok()) { + // conversion succeeded + chunks_[block_index] = std::move(converted); + return Status::OK(); + } + + auto promoted_type = + promotion_graph_->Promote(converter_->out_type(), unconverted_field); + if (promoted_type == nullptr) { + // converter failed, no promotion available + return st; + } + RETURN_NOT_OK(MakeConverter(promoted_type, converter_->pool(), &converter_)); + + size_t nchunks = chunks_.size(); + for (size_t i = 0; i < nchunks; ++i) { + if (i != block_index && chunks_[i]) { + // We're assuming the chunk was converted using the wrong type + // (which should be true unless the executor reorders tasks) + chunks_[i].reset(); + lock.unlock(); + ScheduleConvertChunk(i); + lock.lock(); + } + } + lock.unlock(); + ScheduleConvertChunk(block_index); + return Status::OK(); + } + + Status Finish(std::shared_ptr<ChunkedArray>* out) override { + RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out)); + unconverted_.clear(); + return Status::OK(); + } + + private: + ArrayVector unconverted_; + std::vector<std::shared_ptr<Field>> unconverted_fields_; + const PromotionGraph* promotion_graph_; +}; + +class ChunkedListArrayBuilder : public ChunkedArrayBuilder { + public: + ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool, + std::shared_ptr<ChunkedArrayBuilder> value_builder, + const std::shared_ptr<Field>& value_field) + : ChunkedArrayBuilder(task_group), + pool_(pool), + value_builder_(std::move(value_builder)), + value_field_(value_field) {} + + Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override { + RETURN_NOT_OK(task_group_->Finish()); + RETURN_NOT_OK(value_builder_->ReplaceTaskGroup(task_group)); + task_group_ = task_group; + return Status::OK(); + } + + void Insert(int64_t block_index, const std::shared_ptr<Field>&, + const std::shared_ptr<Array>& unconverted) override { + std::unique_lock<std::mutex> lock(mutex_); + + if (unconverted->type_id() == Type::NA) { + auto st = InsertNull(block_index, unconverted->length()); + if (!st.ok()) { + task_group_->Append([st] { return st; }); + } + return; + } + + DCHECK_EQ(unconverted->type_id(), Type::LIST); + const auto& list_array = checked_cast<const ListArray&>(*unconverted); + + if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) { + null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr); + offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr); + } + null_bitmap_chunks_[block_index] = unconverted->null_bitmap(); + offset_chunks_[block_index] = list_array.value_offsets(); + + value_builder_->Insert(block_index, list_array.list_type()->value_field(), + list_array.values()); + } + + Status Finish(std::shared_ptr<ChunkedArray>* out) override { + RETURN_NOT_OK(task_group_->Finish()); + + std::shared_ptr<ChunkedArray> value_array; + RETURN_NOT_OK(value_builder_->Finish(&value_array)); + + auto type = list(value_field_->WithType(value_array->type())->WithMetadata(nullptr)); + ArrayVector chunks(null_bitmap_chunks_.size()); + for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) { + auto value_chunk = value_array->chunk(static_cast<int>(i)); + auto length = offset_chunks_[i]->size() / sizeof(int32_t) - 1; + chunks[i] = std::make_shared<ListArray>(type, length, offset_chunks_[i], + value_chunk, null_bitmap_chunks_[i]); + } + + *out = std::make_shared<ChunkedArray>(std::move(chunks), type); + return Status::OK(); + } + + private: + // call from Insert() only, with mutex_ locked + Status InsertNull(int64_t block_index, int64_t length) { + value_builder_->Insert(block_index, value_field_, std::make_shared<NullArray>(0)); + + ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_[block_index], + AllocateEmptyBitmap(length, pool_)); + + int64_t offsets_length = (length + 1) * sizeof(int32_t); + ARROW_ASSIGN_OR_RAISE(offset_chunks_[block_index], + AllocateBuffer(offsets_length, pool_)); + std::memset(offset_chunks_[block_index]->mutable_data(), 0, offsets_length); + + return Status::OK(); + } + + std::mutex mutex_; + MemoryPool* pool_; + std::shared_ptr<ChunkedArrayBuilder> value_builder_; + BufferVector offset_chunks_, null_bitmap_chunks_; + std::shared_ptr<Field> value_field_; +}; + +class ChunkedStructArrayBuilder : public ChunkedArrayBuilder { + public: + ChunkedStructArrayBuilder( + const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool, + const PromotionGraph* promotion_graph, + std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>> + name_builders) + : ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) { + for (auto&& name_builder : name_builders) { + auto index = static_cast<int>(name_to_index_.size()); + name_to_index_.emplace(std::move(name_builder.first), index); + child_builders_.emplace_back(std::move(name_builder.second)); + } + } + + void Insert(int64_t block_index, const std::shared_ptr<Field>&, + const std::shared_ptr<Array>& unconverted) override { + std::unique_lock<std::mutex> lock(mutex_); + + if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) { + null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr); + chunk_lengths_.resize(null_bitmap_chunks_.size(), -1); + child_absent_.resize(null_bitmap_chunks_.size(), std::vector<bool>(0)); + } + null_bitmap_chunks_[block_index] = unconverted->null_bitmap(); + chunk_lengths_[block_index] = unconverted->length(); + + if (unconverted->type_id() == Type::NA) { + auto maybe_buffer = AllocateBitmap(unconverted->length(), pool_); + if (maybe_buffer.ok()) { + null_bitmap_chunks_[block_index] = *std::move(maybe_buffer); + std::memset(null_bitmap_chunks_[block_index]->mutable_data(), 0, + null_bitmap_chunks_[block_index]->size()); + } else { + Status st = maybe_buffer.status(); + task_group_->Append([st] { return st; }); + } + + // absent fields will be inserted at Finish + return; + } + + const auto& struct_array = checked_cast<const StructArray&>(*unconverted); + if (promotion_graph_ == nullptr) { + // If unexpected fields are ignored or result in an error then all parsers will emit + // columns exclusively in the ordering specified in ParseOptions::explicit_schema, + // so child_builders_ is immutable and no associative lookup is necessary. + for (int i = 0; i < unconverted->num_fields(); ++i) { + child_builders_[i]->Insert(block_index, unconverted->type()->field(i), + struct_array.field(i)); + } + } else { + auto st = InsertChildren(block_index, struct_array); + if (!st.ok()) { + return task_group_->Append([st] { return st; }); + } + } + } + + Status Finish(std::shared_ptr<ChunkedArray>* out) override { + RETURN_NOT_OK(task_group_->Finish()); + + if (promotion_graph_ != nullptr) { + // insert absent child chunks + for (auto&& name_index : name_to_index_) { + auto child_builder = child_builders_[name_index.second].get(); + + RETURN_NOT_OK(child_builder->ReplaceTaskGroup(TaskGroup::MakeSerial())); + + for (size_t i = 0; i < chunk_lengths_.size(); ++i) { + if (child_absent_[i].size() > static_cast<size_t>(name_index.second) && + !child_absent_[i][name_index.second]) { + continue; + } + auto empty = std::make_shared<NullArray>(chunk_lengths_[i]); + child_builder->Insert(i, promotion_graph_->Null(name_index.first), empty); + } + } + } + + std::vector<std::shared_ptr<Field>> fields(name_to_index_.size()); + std::vector<std::shared_ptr<ChunkedArray>> child_arrays(name_to_index_.size()); + for (auto&& name_index : name_to_index_) { + auto child_builder = child_builders_[name_index.second].get(); + + std::shared_ptr<ChunkedArray> child_array; + RETURN_NOT_OK(child_builder->Finish(&child_array)); + + child_arrays[name_index.second] = child_array; + fields[name_index.second] = field(name_index.first, child_array->type()); + } + + auto type = struct_(std::move(fields)); + ArrayVector chunks(null_bitmap_chunks_.size()); + for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) { + ArrayVector child_chunks; + for (const auto& child_array : child_arrays) { + child_chunks.push_back(child_array->chunk(static_cast<int>(i))); + } + chunks[i] = std::make_shared<StructArray>(type, chunk_lengths_[i], child_chunks, + null_bitmap_chunks_[i]); + } + + *out = std::make_shared<ChunkedArray>(std::move(chunks), type); + return Status::OK(); + } + + Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override { + RETURN_NOT_OK(task_group_->Finish()); + for (auto&& child_builder : child_builders_) { + RETURN_NOT_OK(child_builder->ReplaceTaskGroup(task_group)); + } + task_group_ = task_group; + return Status::OK(); + } + + private: + // Insert children associatively by name; the unconverted block may have unexpected or + // differently ordered fields + // call from Insert() only, with mutex_ locked + Status InsertChildren(int64_t block_index, const StructArray& unconverted) { + const auto& fields = unconverted.type()->fields(); + + for (int i = 0; i < unconverted.num_fields(); ++i) { + auto it = name_to_index_.find(fields[i]->name()); + + if (it == name_to_index_.end()) { + // add a new field to this builder + auto type = promotion_graph_->Infer(fields[i]); + DCHECK_NE(type, nullptr) + << "invalid unconverted_field encountered in conversion: " + << fields[i]->name() << ":" << *fields[i]->type(); + + auto new_index = static_cast<int>(name_to_index_.size()); + it = name_to_index_.emplace(fields[i]->name(), new_index).first; + + std::shared_ptr<ChunkedArrayBuilder> child_builder; + RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type, + &child_builder)); + child_builders_.emplace_back(std::move(child_builder)); + } + + auto unconverted_field = unconverted.type()->field(i); + child_builders_[it->second]->Insert(block_index, unconverted_field, + unconverted.field(i)); + + child_absent_[block_index].resize(child_builders_.size(), true); + child_absent_[block_index][it->second] = false; + } + + return Status::OK(); + } + + std::mutex mutex_; + MemoryPool* pool_; + const PromotionGraph* promotion_graph_; + std::unordered_map<std::string, int> name_to_index_; + std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_; + std::vector<std::vector<bool>> child_absent_; + BufferVector null_bitmap_chunks_; + std::vector<int64_t> chunk_lengths_; +}; + +Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, + MemoryPool* pool, const PromotionGraph* promotion_graph, + const std::shared_ptr<DataType>& type, + std::shared_ptr<ChunkedArrayBuilder>* out) { + if (type->id() == Type::STRUCT) { + std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>> + child_builders; + for (const auto& f : type->fields()) { + std::shared_ptr<ChunkedArrayBuilder> child_builder; + RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(), + &child_builder)); + child_builders.emplace_back(f->name(), std::move(child_builder)); + } + *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph, + std::move(child_builders)); + return Status::OK(); + } + if (type->id() == Type::LIST) { + const auto& list_type = checked_cast<const ListType&>(*type); + std::shared_ptr<ChunkedArrayBuilder> value_builder; + RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, + list_type.value_type(), &value_builder)); + *out = std::make_shared<ChunkedListArrayBuilder>( + task_group, pool, std::move(value_builder), list_type.value_field()); + return Status::OK(); + } + std::shared_ptr<Converter> converter; + RETURN_NOT_OK(MakeConverter(type, pool, &converter)); + if (promotion_graph) { + *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph, + std::move(converter)); + } else { + *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter)); + } + return Status::OK(); +} + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h new file mode 100644 index 00000000000..93b327bf3ae --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunked_builder.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <vector> + +#include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace json { + +class PromotionGraph; + +class ARROW_EXPORT ChunkedArrayBuilder { + public: + virtual ~ChunkedArrayBuilder() = default; + + /// Spawn a task that will try to convert and insert the given JSON block + virtual void Insert(int64_t block_index, + const std::shared_ptr<Field>& unconverted_field, + const std::shared_ptr<Array>& unconverted) = 0; + + /// Return the final chunked array. + /// Every chunk must be inserted before this is called! + virtual Status Finish(std::shared_ptr<ChunkedArray>* out) = 0; + + /// Finish current task group and substitute a new one + virtual Status ReplaceTaskGroup( + const std::shared_ptr<arrow::internal::TaskGroup>& task_group) = 0; + + protected: + explicit ChunkedArrayBuilder( + const std::shared_ptr<arrow::internal::TaskGroup>& task_group) + : task_group_(task_group) {} + + std::shared_ptr<arrow::internal::TaskGroup> task_group_; +}; + +/// create a chunked builder +/// +/// if unexpected fields and promotion need to be handled, promotion_graph must be +/// non-null +ARROW_EXPORT Status MakeChunkedArrayBuilder( + const std::shared_ptr<arrow::internal::TaskGroup>& task_group, MemoryPool* pool, + const PromotionGraph* promotion_graph, const std::shared_ptr<DataType>& type, + std::shared_ptr<ChunkedArrayBuilder>* out); + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc new file mode 100644 index 00000000000..b4b4d31eb94 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.cc @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/chunker.h" + +#include <algorithm> +#include <utility> +#include <vector> + +#include "arrow/json/rapidjson_defs.h" +#include "rapidjson/reader.h" + +#include "arrow/buffer.h" +#include "arrow/json/options.h" +#include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/string_view.h" + +namespace arrow { + +using internal::make_unique; +using util::string_view; + +namespace json { + +namespace rj = arrow::rapidjson; + +static size_t ConsumeWhitespace(string_view view) { +#ifdef RAPIDJSON_SIMD + auto data = view.data(); + auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size()); + return nonws_begin - data; +#else + auto ws_count = view.find_first_not_of(" \t\r\n"); + if (ws_count == string_view::npos) { + return view.size(); + } else { + return ws_count; + } +#endif +} + +/// RapidJson custom stream for reading JSON stored in multiple buffers +/// http://rapidjson.org/md_doc_stream.html#CustomStream +class MultiStringStream { + public: + using Ch = char; + explicit MultiStringStream(std::vector<string_view> strings) + : strings_(std::move(strings)) { + std::reverse(strings_.begin(), strings_.end()); + } + explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) { + for (size_t i = 0; i < buffers.size(); ++i) { + strings_[i] = string_view(*buffers[i]); + } + std::reverse(strings_.begin(), strings_.end()); + } + char Peek() const { + if (strings_.size() == 0) return '\0'; + return strings_.back()[0]; + } + char Take() { + if (strings_.size() == 0) return '\0'; + char taken = strings_.back()[0]; + if (strings_.back().size() == 1) { + strings_.pop_back(); + } else { + strings_.back() = strings_.back().substr(1); + } + ++index_; + return taken; + } + size_t Tell() { return index_; } + void Put(char) { ARROW_LOG(FATAL) << "not implemented"; } + void Flush() { ARROW_LOG(FATAL) << "not implemented"; } + char* PutBegin() { + ARROW_LOG(FATAL) << "not implemented"; + return nullptr; + } + size_t PutEnd(char*) { + ARROW_LOG(FATAL) << "not implemented"; + return 0; + } + + private: + size_t index_ = 0; + std::vector<string_view> strings_; +}; + +template <typename Stream> +static size_t ConsumeWholeObject(Stream&& stream) { + static constexpr unsigned parse_flags = rj::kParseIterativeFlag | + rj::kParseStopWhenDoneFlag | + rj::kParseNumbersAsStringsFlag; + rj::BaseReaderHandler<rj::UTF8<>> handler; + rj::Reader reader; + // parse a single JSON object + switch (reader.Parse<parse_flags>(stream, handler).Code()) { + case rj::kParseErrorNone: + return stream.Tell(); + case rj::kParseErrorDocumentEmpty: + return 0; + default: + // rapidjson emitted an error, the most recent object was partial + return string_view::npos; + } +} + +namespace { + +// A BoundaryFinder implementation that assumes JSON objects can contain raw newlines, +// and uses actual JSON parsing to delimit them. +class ParsingBoundaryFinder : public BoundaryFinder { + public: + Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override { + // NOTE: We could bubble up JSON parse errors here, but the actual parsing + // step will detect them later anyway. + auto length = ConsumeWholeObject(MultiStringStream({partial, block})); + if (length == string_view::npos) { + *out_pos = -1; + } else { + DCHECK_GE(length, partial.size()); + DCHECK_LE(length, partial.size() + block.size()); + *out_pos = static_cast<int64_t>(length - partial.size()); + } + return Status::OK(); + } + + Status FindLast(util::string_view block, int64_t* out_pos) override { + const size_t block_length = block.size(); + size_t consumed_length = 0; + while (consumed_length < block_length) { + rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size()); + using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>; + auto length = ConsumeWholeObject(InputStream(ms)); + if (length == string_view::npos || length == 0) { + // found incomplete object or block is empty + break; + } + consumed_length += length; + block = block.substr(length); + } + if (consumed_length == 0) { + *out_pos = -1; + } else { + consumed_length += ConsumeWhitespace(block); + DCHECK_LE(consumed_length, block_length); + *out_pos = static_cast<int64_t>(consumed_length); + } + return Status::OK(); + } + + Status FindNth(util::string_view partial, util::string_view block, int64_t count, + int64_t* out_pos, int64_t* num_found) override { + return Status::NotImplemented("ParsingBoundaryFinder::FindNth"); + } +}; + +} // namespace + +std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) { + std::shared_ptr<BoundaryFinder> delimiter; + if (options.newlines_in_values) { + delimiter = std::make_shared<ParsingBoundaryFinder>(); + } else { + delimiter = MakeNewlineBoundaryFinder(); + } + return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter))); +} + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h new file mode 100644 index 00000000000..9ed85126da1 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/chunker.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "arrow/util/delimiting.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace json { + +struct ParseOptions; + +ARROW_EXPORT +std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options); + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc new file mode 100644 index 00000000000..fe9500d40ca --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.cc @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/converter.h" + +#include <memory> +#include <utility> + +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_time.h" +#include "arrow/json/parser.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" +#include "arrow/util/value_parsing.h" + +namespace arrow { + +using internal::checked_cast; +using util::string_view; + +namespace json { + +template <typename... Args> +Status GenericConversionError(const DataType& type, Args&&... args) { + return Status::Invalid("Failed of conversion of JSON to ", type, + std::forward<Args>(args)...); +} + +namespace { + +const DictionaryArray& GetDictionaryArray(const std::shared_ptr<Array>& in) { + DCHECK_EQ(in->type_id(), Type::DICTIONARY); + auto dict_type = checked_cast<const DictionaryType*>(in->type().get()); + DCHECK_EQ(dict_type->index_type()->id(), Type::INT32); + DCHECK_EQ(dict_type->value_type()->id(), Type::STRING); + return checked_cast<const DictionaryArray&>(*in); +} + +template <typename ValidVisitor, typename NullVisitor> +Status VisitDictionaryEntries(const DictionaryArray& dict_array, + ValidVisitor&& visit_valid, NullVisitor&& visit_null) { + const StringArray& dict = checked_cast<const StringArray&>(*dict_array.dictionary()); + const Int32Array& indices = checked_cast<const Int32Array&>(*dict_array.indices()); + for (int64_t i = 0; i < indices.length(); ++i) { + if (indices.IsValid(i)) { + RETURN_NOT_OK(visit_valid(dict.GetView(indices.GetView(i)))); + } else { + RETURN_NOT_OK(visit_null()); + } + } + return Status::OK(); +} + +} // namespace + +// base class for types which accept and output non-nested types +class PrimitiveConverter : public Converter { + public: + PrimitiveConverter(MemoryPool* pool, std::shared_ptr<DataType> out_type) + : Converter(pool, out_type) {} +}; + +class NullConverter : public PrimitiveConverter { + public: + using PrimitiveConverter::PrimitiveConverter; + + Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override { + if (in->type_id() != Type::NA) { + return GenericConversionError(*out_type_, " from ", *in->type()); + } + *out = in; + return Status::OK(); + } +}; + +class BooleanConverter : public PrimitiveConverter { + public: + using PrimitiveConverter::PrimitiveConverter; + + Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override { + if (in->type_id() == Type::NA) { + return MakeArrayOfNull(boolean(), in->length(), pool_).Value(out); + } + if (in->type_id() != Type::BOOL) { + return GenericConversionError(*out_type_, " from ", *in->type()); + } + *out = in; + return Status::OK(); + } +}; + +template <typename T> +class NumericConverter : public PrimitiveConverter { + public: + using value_type = typename T::c_type; + + NumericConverter(MemoryPool* pool, const std::shared_ptr<DataType>& type) + : PrimitiveConverter(pool, type), numeric_type_(checked_cast<const T&>(*type)) {} + + Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override { + if (in->type_id() == Type::NA) { + return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out); + } + const auto& dict_array = GetDictionaryArray(in); + + using Builder = typename TypeTraits<T>::BuilderType; + Builder builder(out_type_, pool_); + RETURN_NOT_OK(builder.Resize(dict_array.indices()->length())); + + auto visit_valid = [&](string_view repr) { + value_type value; + if (!arrow::internal::ParseValue(numeric_type_, repr.data(), repr.size(), &value)) { + return GenericConversionError(*out_type_, ", couldn't parse:", repr); + } + + builder.UnsafeAppend(value); + return Status::OK(); + }; + + auto visit_null = [&]() { + builder.UnsafeAppendNull(); + return Status::OK(); + }; + + RETURN_NOT_OK(VisitDictionaryEntries(dict_array, visit_valid, visit_null)); + return builder.Finish(out); + } + + const T& numeric_type_; +}; + +template <typename DateTimeType> +class DateTimeConverter : public PrimitiveConverter { + public: + DateTimeConverter(MemoryPool* pool, const std::shared_ptr<DataType>& type) + : PrimitiveConverter(pool, type), converter_(pool, repr_type()) {} + + Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override { + if (in->type_id() == Type::NA) { + return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out); + } + + std::shared_ptr<Array> repr; + RETURN_NOT_OK(converter_.Convert(in, &repr)); + + auto out_data = repr->data()->Copy(); + out_data->type = out_type_; + *out = MakeArray(out_data); + + return Status::OK(); + } + + private: + using ReprType = typename CTypeTraits<typename DateTimeType::c_type>::ArrowType; + static std::shared_ptr<DataType> repr_type() { + return TypeTraits<ReprType>::type_singleton(); + } + NumericConverter<ReprType> converter_; +}; + +template <typename T> +class BinaryConverter : public PrimitiveConverter { + public: + using PrimitiveConverter::PrimitiveConverter; + + Status Convert(const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) override { + if (in->type_id() == Type::NA) { + return MakeArrayOfNull(out_type_, in->length(), pool_).Value(out); + } + const auto& dict_array = GetDictionaryArray(in); + + using Builder = typename TypeTraits<T>::BuilderType; + Builder builder(out_type_, pool_); + RETURN_NOT_OK(builder.Resize(dict_array.indices()->length())); + + // TODO(bkietz) this can be computed during parsing at low cost + int64_t data_length = 0; + auto visit_lengths_valid = [&](string_view value) { + data_length += value.size(); + return Status::OK(); + }; + + auto visit_lengths_null = [&]() { + // no-op + return Status::OK(); + }; + + RETURN_NOT_OK( + VisitDictionaryEntries(dict_array, visit_lengths_valid, visit_lengths_null)); + RETURN_NOT_OK(builder.ReserveData(data_length)); + + auto visit_valid = [&](string_view value) { + builder.UnsafeAppend(value); + return Status::OK(); + }; + + auto visit_null = [&]() { + builder.UnsafeAppendNull(); + return Status::OK(); + }; + + RETURN_NOT_OK(VisitDictionaryEntries(dict_array, visit_valid, visit_null)); + return builder.Finish(out); + } +}; + +Status MakeConverter(const std::shared_ptr<DataType>& out_type, MemoryPool* pool, + std::shared_ptr<Converter>* out) { + switch (out_type->id()) { +#define CONVERTER_CASE(TYPE_ID, CONVERTER_TYPE) \ + case TYPE_ID: \ + *out = std::make_shared<CONVERTER_TYPE>(pool, out_type); \ + break + CONVERTER_CASE(Type::NA, NullConverter); + CONVERTER_CASE(Type::BOOL, BooleanConverter); + CONVERTER_CASE(Type::INT8, NumericConverter<Int8Type>); + CONVERTER_CASE(Type::INT16, NumericConverter<Int16Type>); + CONVERTER_CASE(Type::INT32, NumericConverter<Int32Type>); + CONVERTER_CASE(Type::INT64, NumericConverter<Int64Type>); + CONVERTER_CASE(Type::UINT8, NumericConverter<UInt8Type>); + CONVERTER_CASE(Type::UINT16, NumericConverter<UInt16Type>); + CONVERTER_CASE(Type::UINT32, NumericConverter<UInt32Type>); + CONVERTER_CASE(Type::UINT64, NumericConverter<UInt64Type>); + CONVERTER_CASE(Type::FLOAT, NumericConverter<FloatType>); + CONVERTER_CASE(Type::DOUBLE, NumericConverter<DoubleType>); + CONVERTER_CASE(Type::TIMESTAMP, NumericConverter<TimestampType>); + CONVERTER_CASE(Type::TIME32, DateTimeConverter<Time32Type>); + CONVERTER_CASE(Type::TIME64, DateTimeConverter<Time64Type>); + CONVERTER_CASE(Type::DATE32, DateTimeConverter<Date32Type>); + CONVERTER_CASE(Type::DATE64, DateTimeConverter<Date64Type>); + CONVERTER_CASE(Type::BINARY, BinaryConverter<BinaryType>); + CONVERTER_CASE(Type::STRING, BinaryConverter<StringType>); + CONVERTER_CASE(Type::LARGE_BINARY, BinaryConverter<LargeBinaryType>); + CONVERTER_CASE(Type::LARGE_STRING, BinaryConverter<LargeStringType>); + default: + return Status::NotImplemented("JSON conversion to ", *out_type, + " is not supported"); +#undef CONVERTER_CASE + } + return Status::OK(); +} + +const PromotionGraph* GetPromotionGraph() { + static struct : PromotionGraph { + std::shared_ptr<Field> Null(const std::string& name) const override { + return field(name, null(), true, Kind::Tag(Kind::kNull)); + } + + std::shared_ptr<DataType> Infer( + const std::shared_ptr<Field>& unexpected_field) const override { + auto kind = Kind::FromTag(unexpected_field->metadata()); + switch (kind) { + case Kind::kNull: + return null(); + + case Kind::kBoolean: + return boolean(); + + case Kind::kNumber: + return int64(); + + case Kind::kString: + return timestamp(TimeUnit::SECOND); + + case Kind::kArray: { + const auto& type = checked_cast<const ListType&>(*unexpected_field->type()); + auto value_field = type.value_field(); + return list(value_field->WithType(Infer(value_field))); + } + case Kind::kObject: { + auto fields = unexpected_field->type()->fields(); + for (auto& field : fields) { + field = field->WithType(Infer(field)); + } + return struct_(std::move(fields)); + } + default: + return nullptr; + } + } + + std::shared_ptr<DataType> Promote( + const std::shared_ptr<DataType>& failed, + const std::shared_ptr<Field>& unexpected_field) const override { + switch (failed->id()) { + case Type::NA: + return Infer(unexpected_field); + + case Type::TIMESTAMP: + return utf8(); + + case Type::INT64: + return float64(); + + default: + return nullptr; + } + } + } impl; + + return &impl; +} + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h new file mode 100644 index 00000000000..9a812dd3c3a --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/converter.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> + +#include "arrow/status.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class DataType; +class Field; +class MemoryPool; + +namespace json { + +/// \brief interface for conversion of Arrays +/// +/// Converters are not required to be correct for arbitrary input- only +/// for unconverted arrays emitted by a corresponding parser. +class ARROW_EXPORT Converter { + public: + virtual ~Converter() = default; + + /// convert an array + /// on failure, this converter may be promoted to another converter which + /// *can* convert the given input. + virtual Status Convert(const std::shared_ptr<Array>& in, + std::shared_ptr<Array>* out) = 0; + + std::shared_ptr<DataType> out_type() const { return out_type_; } + + MemoryPool* pool() { return pool_; } + + protected: + ARROW_DISALLOW_COPY_AND_ASSIGN(Converter); + + Converter(MemoryPool* pool, const std::shared_ptr<DataType>& out_type) + : pool_(pool), out_type_(out_type) {} + + MemoryPool* pool_; + std::shared_ptr<DataType> out_type_; +}; + +/// \brief produce a single converter to the specified out_type +ARROW_EXPORT Status MakeConverter(const std::shared_ptr<DataType>& out_type, + MemoryPool* pool, std::shared_ptr<Converter>* out); + +class ARROW_EXPORT PromotionGraph { + public: + virtual ~PromotionGraph() = default; + + /// \brief produce a valid field which will be inferred as null + virtual std::shared_ptr<Field> Null(const std::string& name) const = 0; + + /// \brief given an unexpected field encountered during parsing, return a type to which + /// it may be convertible (may return null if none is available) + virtual std::shared_ptr<DataType> Infer( + const std::shared_ptr<Field>& unexpected_field) const = 0; + + /// \brief given a type to which conversion failed, return a promoted type to which + /// conversion may succeed (may return null if none is available) + virtual std::shared_ptr<DataType> Promote( + const std::shared_ptr<DataType>& failed, + const std::shared_ptr<Field>& unexpected_field) const = 0; + + protected: + ARROW_DISALLOW_COPY_AND_ASSIGN(PromotionGraph); + PromotionGraph() = default; +}; + +ARROW_EXPORT const PromotionGraph* GetPromotionGraph(); + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc new file mode 100644 index 00000000000..c857cd537e7 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.cc @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/object_parser.h" +#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep + +#include <rapidjson/document.h> + +namespace arrow { +namespace json { +namespace internal { + +namespace rj = arrow::rapidjson; + +class ObjectParser::Impl { + public: + Status Parse(arrow::util::string_view json) { + document_.Parse(reinterpret_cast<const rj::Document::Ch*>(json.data()), + static_cast<size_t>(json.size())); + + if (document_.HasParseError()) { + return Status::Invalid("Json parse error (offset ", document_.GetErrorOffset(), + "): ", document_.GetParseError()); + } + if (!document_.IsObject()) { + return Status::TypeError("Not a json object"); + } + return Status::OK(); + } + + Result<std::string> GetString(const char* key) const { + if (!document_.HasMember(key)) { + return Status::KeyError("Key '", key, "' does not exist"); + } + if (!document_[key].IsString()) { + return Status::TypeError("Key '", key, "' is not a string"); + } + return document_[key].GetString(); + } + + Result<bool> GetBool(const char* key) const { + if (!document_.HasMember(key)) { + return Status::KeyError("Key '", key, "' does not exist"); + } + if (!document_[key].IsBool()) { + return Status::TypeError("Key '", key, "' is not a boolean"); + } + return document_[key].GetBool(); + } + + private: + rj::Document document_; +}; + +ObjectParser::ObjectParser() : impl_(new ObjectParser::Impl()) {} + +ObjectParser::~ObjectParser() = default; + +Status ObjectParser::Parse(arrow::util::string_view json) { return impl_->Parse(json); } + +Result<std::string> ObjectParser::GetString(const char* key) const { + return impl_->GetString(key); +} + +Result<bool> ObjectParser::GetBool(const char* key) const { return impl_->GetBool(key); } + +} // namespace internal +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h new file mode 100644 index 00000000000..ef93201651a --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_parser.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "arrow/result.h" +#include "arrow/util/string_view.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace json { +namespace internal { + +/// This class is a helper to parse a json object from a string. +/// It uses rapidjson::Document in implementation. +class ARROW_EXPORT ObjectParser { + public: + ObjectParser(); + ~ObjectParser(); + + Status Parse(arrow::util::string_view json); + + Result<std::string> GetString(const char* key) const; + Result<bool> GetBool(const char* key) const; + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +} // namespace internal +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc new file mode 100644 index 00000000000..06d09f81e94 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.cc @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/object_writer.h" +#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep + +#include <rapidjson/document.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> + +namespace rj = arrow::rapidjson; + +namespace arrow { +namespace json { +namespace internal { + +class ObjectWriter::Impl { + public: + Impl() : root_(rj::kObjectType) {} + + void SetString(arrow::util::string_view key, arrow::util::string_view value) { + rj::Document::AllocatorType& allocator = document_.GetAllocator(); + + rj::Value str_key(key.data(), allocator); + rj::Value str_value(value.data(), allocator); + + root_.AddMember(str_key, str_value, allocator); + } + + void SetBool(arrow::util::string_view key, bool value) { + rj::Document::AllocatorType& allocator = document_.GetAllocator(); + + rj::Value str_key(key.data(), allocator); + + root_.AddMember(str_key, value, allocator); + } + + std::string Serialize() { + rj::StringBuffer buffer; + rj::Writer<rj::StringBuffer> writer(buffer); + root_.Accept(writer); + + return buffer.GetString(); + } + + private: + rj::Document document_; + rj::Value root_; +}; + +ObjectWriter::ObjectWriter() : impl_(new ObjectWriter::Impl()) {} + +ObjectWriter::~ObjectWriter() = default; + +void ObjectWriter::SetString(arrow::util::string_view key, + arrow::util::string_view value) { + impl_->SetString(key, value); +} + +void ObjectWriter::SetBool(arrow::util::string_view key, bool value) { + impl_->SetBool(key, value); +} + +std::string ObjectWriter::Serialize() { return impl_->Serialize(); } + +} // namespace internal +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h new file mode 100644 index 00000000000..55ff0ce52bc --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/object_writer.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "arrow/util/string_view.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace json { +namespace internal { + +/// This class is a helper to serialize a json object to a string. +/// It uses rapidjson in implementation. +class ARROW_EXPORT ObjectWriter { + public: + ObjectWriter(); + ~ObjectWriter(); + + void SetString(arrow::util::string_view key, arrow::util::string_view value); + void SetBool(arrow::util::string_view key, bool value); + + std::string Serialize(); + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +} // namespace internal +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc new file mode 100644 index 00000000000..dc5e628b1f3 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.cc @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/options.h" + +namespace arrow { +namespace json { + +ParseOptions ParseOptions::Defaults() { return ParseOptions(); } + +ReadOptions ReadOptions::Defaults() { return ReadOptions(); } + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h new file mode 100644 index 00000000000..d7edab9cedd --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/options.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <memory> + +#include "arrow/json/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class DataType; +class Schema; + +namespace json { + +enum class UnexpectedFieldBehavior : char { + /// Unexpected JSON fields are ignored + Ignore, + /// Unexpected JSON fields error out + Error, + /// Unexpected JSON fields are type-inferred and included in the output + InferType +}; + +struct ARROW_EXPORT ParseOptions { + // Parsing options + + /// Optional explicit schema (disables type inference on those fields) + std::shared_ptr<Schema> explicit_schema; + + /// Whether objects may be printed across multiple lines (for example pretty-printed) + /// + /// If true, parsing may be slower. + bool newlines_in_values = false; + + /// How JSON fields outside of explicit_schema (if given) are treated + UnexpectedFieldBehavior unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + + /// Create parsing options with default values + static ParseOptions Defaults(); +}; + +struct ARROW_EXPORT ReadOptions { + // Reader options + + /// Whether to use the global CPU thread pool + bool use_threads = true; + /// Block size we request from the IO layer; also determines the size of + /// chunks when use_threads is true + int32_t block_size = 1 << 20; // 1 MB + + /// Create read options with default values + static ReadOptions Defaults(); +}; + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc new file mode 100644 index 00000000000..05f155645a6 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.cc @@ -0,0 +1,1099 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/parser.h" + +#include <functional> +#include <limits> +#include <tuple> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "arrow/json/rapidjson_defs.h" +#include "rapidjson/error/en.h" +#include "rapidjson/reader.h" + +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/buffer_builder.h" +#include "arrow/type.h" +#include "arrow/util/bitset_stack.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/string_view.h" +#include "arrow/util/trie.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitsetStack; +using internal::checked_cast; +using internal::make_unique; +using util::string_view; + +namespace json { + +namespace rj = arrow::rapidjson; + +template <typename... T> +static Status ParseError(T&&... t) { + return Status::Invalid("JSON parse error: ", std::forward<T>(t)...); +} + +const std::string& Kind::Name(Kind::type kind) { + static const std::string names[] = {"null", "boolean", "number", + "string", "array", "object"}; + + return names[kind]; +} + +const std::shared_ptr<const KeyValueMetadata>& Kind::Tag(Kind::type kind) { + static const std::shared_ptr<const KeyValueMetadata> tags[] = { + key_value_metadata({{"json_kind", Kind::Name(Kind::kNull)}}), + key_value_metadata({{"json_kind", Kind::Name(Kind::kBoolean)}}), + key_value_metadata({{"json_kind", Kind::Name(Kind::kNumber)}}), + key_value_metadata({{"json_kind", Kind::Name(Kind::kString)}}), + key_value_metadata({{"json_kind", Kind::Name(Kind::kArray)}}), + key_value_metadata({{"json_kind", Kind::Name(Kind::kObject)}}), + }; + return tags[kind]; +} + +static arrow::internal::Trie MakeFromTagTrie() { + arrow::internal::TrieBuilder builder; + for (auto kind : {Kind::kNull, Kind::kBoolean, Kind::kNumber, Kind::kString, + Kind::kArray, Kind::kObject}) { + DCHECK_OK(builder.Append(Kind::Name(kind))); + } + auto name_to_kind = builder.Finish(); + DCHECK_OK(name_to_kind.Validate()); + return name_to_kind; +} + +Kind::type Kind::FromTag(const std::shared_ptr<const KeyValueMetadata>& tag) { + static arrow::internal::Trie name_to_kind = MakeFromTagTrie(); + DCHECK_NE(tag->FindKey("json_kind"), -1); + util::string_view name = tag->value(tag->FindKey("json_kind")); + DCHECK_NE(name_to_kind.Find(name), -1); + return static_cast<Kind::type>(name_to_kind.Find(name)); +} + +Status Kind::ForType(const DataType& type, Kind::type* kind) { + struct { + Status Visit(const NullType&) { return SetKind(Kind::kNull); } + Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); } + Status Visit(const NumberType&) { return SetKind(Kind::kNumber); } + Status Visit(const TimeType&) { return SetKind(Kind::kNumber); } + Status Visit(const DateType&) { return SetKind(Kind::kNumber); } + Status Visit(const BinaryType&) { return SetKind(Kind::kString); } + Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); } + Status Visit(const DictionaryType& dict_type) { + return Kind::ForType(*dict_type.value_type(), kind_); + } + Status Visit(const ListType&) { return SetKind(Kind::kArray); } + Status Visit(const StructType&) { return SetKind(Kind::kObject); } + Status Visit(const DataType& not_impl) { + return Status::NotImplemented("JSON parsing of ", not_impl); + } + Status SetKind(Kind::type kind) { + *kind_ = kind; + return Status::OK(); + } + Kind::type* kind_; + } visitor = {kind}; + return VisitTypeInline(type, &visitor); +} + +/// \brief ArrayBuilder for parsed but unconverted arrays +template <Kind::type> +class RawArrayBuilder; + +/// \brief packed pointer to a RawArrayBuilder +/// +/// RawArrayBuilders are stored in HandlerBase, +/// which allows storage of their indices (uint32_t) instead of a full pointer. +/// BuilderPtr is also tagged with the json kind and nullable properties +/// so those can be accessed before dereferencing the builder. +struct BuilderPtr { + BuilderPtr() : BuilderPtr(BuilderPtr::null) {} + BuilderPtr(Kind::type k, uint32_t i, bool n) : index(i), kind(k), nullable(n) {} + + BuilderPtr(const BuilderPtr&) = default; + BuilderPtr& operator=(const BuilderPtr&) = default; + BuilderPtr(BuilderPtr&&) = default; + BuilderPtr& operator=(BuilderPtr&&) = default; + + // index of builder in its arena + // OR the length of that builder if kind == Kind::kNull + // (we don't allocate an arena for nulls since they're trivial) + uint32_t index; + Kind::type kind; + bool nullable; + + bool operator==(BuilderPtr other) const { + return kind == other.kind && index == other.index; + } + + bool operator!=(BuilderPtr other) const { return !(other == *this); } + + operator bool() const { return *this != null; } + + bool operator!() const { return *this == null; } + + // The static BuilderPtr for null type data + static const BuilderPtr null; +}; + +const BuilderPtr BuilderPtr::null(Kind::kNull, 0, true); + +template <> +class RawArrayBuilder<Kind::kBoolean> { + public: + explicit RawArrayBuilder(MemoryPool* pool) + : data_builder_(pool), null_bitmap_builder_(pool) {} + + Status Append(bool value) { + RETURN_NOT_OK(data_builder_.Append(value)); + return null_bitmap_builder_.Append(true); + } + + Status AppendNull() { + RETURN_NOT_OK(data_builder_.Append(false)); + return null_bitmap_builder_.Append(false); + } + + Status AppendNull(int64_t count) { + RETURN_NOT_OK(data_builder_.Append(count, false)); + return null_bitmap_builder_.Append(count, false); + } + + Status Finish(std::shared_ptr<Array>* out) { + auto size = length(); + auto null_count = null_bitmap_builder_.false_count(); + std::shared_ptr<Buffer> data, null_bitmap; + RETURN_NOT_OK(data_builder_.Finish(&data)); + RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + *out = MakeArray(ArrayData::Make(boolean(), size, {null_bitmap, data}, null_count)); + return Status::OK(); + } + + int64_t length() { return null_bitmap_builder_.length(); } + + private: + TypedBufferBuilder<bool> data_builder_; + TypedBufferBuilder<bool> null_bitmap_builder_; +}; + +/// \brief builder for strings or unconverted numbers +/// +/// Both of these are represented in the builder as an index only; +/// the actual characters are stored in a single StringArray (into which +/// an index refers). This means building is faster since we don't do +/// allocation for string/number characters but accessing is strided. +/// +/// On completion the indices and the character storage are combined +/// into a dictionary-encoded array, which is a convenient container +/// for indices referring into another array. +class ScalarBuilder { + public: + explicit ScalarBuilder(MemoryPool* pool) + : values_length_(0), data_builder_(pool), null_bitmap_builder_(pool) {} + + Status Append(int32_t index, int32_t value_length) { + RETURN_NOT_OK(data_builder_.Append(index)); + values_length_ += value_length; + return null_bitmap_builder_.Append(true); + } + + Status AppendNull() { + RETURN_NOT_OK(data_builder_.Append(0)); + return null_bitmap_builder_.Append(false); + } + + Status AppendNull(int64_t count) { + RETURN_NOT_OK(data_builder_.Append(count, 0)); + return null_bitmap_builder_.Append(count, false); + } + + Status Finish(std::shared_ptr<Array>* out) { + auto size = length(); + auto null_count = null_bitmap_builder_.false_count(); + std::shared_ptr<Buffer> data, null_bitmap; + RETURN_NOT_OK(data_builder_.Finish(&data)); + RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + *out = MakeArray(ArrayData::Make(int32(), size, {null_bitmap, data}, null_count)); + return Status::OK(); + } + + int64_t length() { return null_bitmap_builder_.length(); } + + int32_t values_length() { return values_length_; } + + private: + int32_t values_length_; + TypedBufferBuilder<int32_t> data_builder_; + TypedBufferBuilder<bool> null_bitmap_builder_; +}; + +template <> +class RawArrayBuilder<Kind::kNumber> : public ScalarBuilder { + public: + using ScalarBuilder::ScalarBuilder; +}; + +template <> +class RawArrayBuilder<Kind::kString> : public ScalarBuilder { + public: + using ScalarBuilder::ScalarBuilder; +}; + +template <> +class RawArrayBuilder<Kind::kArray> { + public: + explicit RawArrayBuilder(MemoryPool* pool) + : offset_builder_(pool), null_bitmap_builder_(pool) {} + + Status Append(int32_t child_length) { + RETURN_NOT_OK(offset_builder_.Append(offset_)); + offset_ += child_length; + return null_bitmap_builder_.Append(true); + } + + Status AppendNull() { + RETURN_NOT_OK(offset_builder_.Append(offset_)); + return null_bitmap_builder_.Append(false); + } + + Status AppendNull(int64_t count) { + RETURN_NOT_OK(offset_builder_.Append(count, offset_)); + return null_bitmap_builder_.Append(count, false); + } + + Status Finish(std::function<Status(BuilderPtr, std::shared_ptr<Array>*)> finish_child, + std::shared_ptr<Array>* out) { + RETURN_NOT_OK(offset_builder_.Append(offset_)); + auto size = length(); + auto null_count = null_bitmap_builder_.false_count(); + std::shared_ptr<Buffer> offsets, null_bitmap; + RETURN_NOT_OK(offset_builder_.Finish(&offsets)); + RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + std::shared_ptr<Array> values; + RETURN_NOT_OK(finish_child(value_builder_, &values)); + auto type = list(field("item", values->type(), value_builder_.nullable, + Kind::Tag(value_builder_.kind))); + *out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()}, + null_count)); + return Status::OK(); + } + + BuilderPtr value_builder() const { return value_builder_; } + + void value_builder(BuilderPtr builder) { value_builder_ = builder; } + + int64_t length() { return null_bitmap_builder_.length(); } + + private: + BuilderPtr value_builder_ = BuilderPtr::null; + int32_t offset_ = 0; + TypedBufferBuilder<int32_t> offset_builder_; + TypedBufferBuilder<bool> null_bitmap_builder_; +}; + +template <> +class RawArrayBuilder<Kind::kObject> { + public: + explicit RawArrayBuilder(MemoryPool* pool) : null_bitmap_builder_(pool) {} + + Status Append() { return null_bitmap_builder_.Append(true); } + + Status AppendNull() { return null_bitmap_builder_.Append(false); } + + Status AppendNull(int64_t count) { return null_bitmap_builder_.Append(count, false); } + + std::string FieldName(int i) const { + for (const auto& name_index : name_to_index_) { + if (name_index.second == i) { + return name_index.first; + } + } + return ""; + } + + int GetFieldIndex(const std::string& name) const { + auto it = name_to_index_.find(name); + if (it == name_to_index_.end()) { + return -1; + } + return it->second; + } + + int AddField(std::string name, BuilderPtr builder) { + auto index = num_fields(); + field_builders_.push_back(builder); + name_to_index_.emplace(std::move(name), index); + return index; + } + + int num_fields() const { return static_cast<int>(field_builders_.size()); } + + BuilderPtr field_builder(int index) const { return field_builders_[index]; } + + void field_builder(int index, BuilderPtr builder) { field_builders_[index] = builder; } + + Status Finish(std::function<Status(BuilderPtr, std::shared_ptr<Array>*)> finish_child, + std::shared_ptr<Array>* out) { + auto size = length(); + auto null_count = null_bitmap_builder_.false_count(); + std::shared_ptr<Buffer> null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + + std::vector<string_view> field_names(num_fields()); + for (const auto& name_index : name_to_index_) { + field_names[name_index.second] = name_index.first; + } + + std::vector<std::shared_ptr<Field>> fields(num_fields()); + std::vector<std::shared_ptr<ArrayData>> child_data(num_fields()); + for (int i = 0; i < num_fields(); ++i) { + std::shared_ptr<Array> field_values; + RETURN_NOT_OK(finish_child(field_builders_[i], &field_values)); + child_data[i] = field_values->data(); + fields[i] = field(std::string(field_names[i]), field_values->type(), + field_builders_[i].nullable, Kind::Tag(field_builders_[i].kind)); + } + + *out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap}, + std::move(child_data), null_count)); + return Status::OK(); + } + + int64_t length() { return null_bitmap_builder_.length(); } + + private: + std::vector<BuilderPtr> field_builders_; + std::unordered_map<std::string, int> name_to_index_; + TypedBufferBuilder<bool> null_bitmap_builder_; +}; + +class RawBuilderSet { + public: + explicit RawBuilderSet(MemoryPool* pool) : pool_(pool) {} + + /// Retrieve a pointer to a builder from a BuilderPtr + template <Kind::type kind> + enable_if_t<kind != Kind::kNull, RawArrayBuilder<kind>*> Cast(BuilderPtr builder) { + DCHECK_EQ(builder.kind, kind); + return arena<kind>().data() + builder.index; + } + + /// construct a builder of statically defined kind + template <Kind::type kind> + Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) { + builder->index = static_cast<uint32_t>(arena<kind>().size()); + builder->kind = kind; + builder->nullable = true; + arena<kind>().emplace_back(RawArrayBuilder<kind>(pool_)); + return Cast<kind>(*builder)->AppendNull(leading_nulls); + } + + /// construct a builder of whatever kind corresponds to a DataType + Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) { + Kind::type kind; + RETURN_NOT_OK(Kind::ForType(t, &kind)); + switch (kind) { + case Kind::kNull: + *builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true); + return Status::OK(); + + case Kind::kBoolean: + return MakeBuilder<Kind::kBoolean>(leading_nulls, builder); + + case Kind::kNumber: + return MakeBuilder<Kind::kNumber>(leading_nulls, builder); + + case Kind::kString: + return MakeBuilder<Kind::kString>(leading_nulls, builder); + + case Kind::kArray: { + RETURN_NOT_OK(MakeBuilder<Kind::kArray>(leading_nulls, builder)); + const auto& list_type = checked_cast<const ListType&>(t); + + BuilderPtr value_builder; + RETURN_NOT_OK(MakeBuilder(*list_type.value_type(), 0, &value_builder)); + value_builder.nullable = list_type.value_field()->nullable(); + + Cast<Kind::kArray>(*builder)->value_builder(value_builder); + return Status::OK(); + } + case Kind::kObject: { + RETURN_NOT_OK(MakeBuilder<Kind::kObject>(leading_nulls, builder)); + const auto& struct_type = checked_cast<const StructType&>(t); + + for (const auto& f : struct_type.fields()) { + BuilderPtr field_builder; + RETURN_NOT_OK(MakeBuilder(*f->type(), leading_nulls, &field_builder)); + field_builder.nullable = f->nullable(); + + Cast<Kind::kObject>(*builder)->AddField(f->name(), field_builder); + } + return Status::OK(); + } + default: + return Status::NotImplemented("invalid builder type"); + } + } + + /// Appending null is slightly tricky since null count is stored inline + /// for builders of Kind::kNull. Append nulls using this helper + Status AppendNull(BuilderPtr parent, int field_index, BuilderPtr builder) { + if (ARROW_PREDICT_FALSE(!builder.nullable)) { + return ParseError("a required field was null"); + } + switch (builder.kind) { + case Kind::kNull: { + DCHECK_EQ(builder, parent.kind == Kind::kArray + ? Cast<Kind::kArray>(parent)->value_builder() + : Cast<Kind::kObject>(parent)->field_builder(field_index)); + + // increment null count stored inline + builder.index += 1; + + // update the parent, since changing builder doesn't affect parent + if (parent.kind == Kind::kArray) { + Cast<Kind::kArray>(parent)->value_builder(builder); + } else { + Cast<Kind::kObject>(parent)->field_builder(field_index, builder); + } + return Status::OK(); + } + case Kind::kBoolean: + return Cast<Kind::kBoolean>(builder)->AppendNull(); + + case Kind::kNumber: + return Cast<Kind::kNumber>(builder)->AppendNull(); + + case Kind::kString: + return Cast<Kind::kString>(builder)->AppendNull(); + + case Kind::kArray: + return Cast<Kind::kArray>(builder)->AppendNull(); + + case Kind::kObject: { + auto struct_builder = Cast<Kind::kObject>(builder); + RETURN_NOT_OK(struct_builder->AppendNull()); + + for (int i = 0; i < struct_builder->num_fields(); ++i) { + auto field_builder = struct_builder->field_builder(i); + RETURN_NOT_OK(AppendNull(builder, i, field_builder)); + } + return Status::OK(); + } + default: + return Status::NotImplemented("invalid builder Kind"); + } + } + + Status Finish(const std::shared_ptr<Array>& scalar_values, BuilderPtr builder, + std::shared_ptr<Array>* out) { + auto finish_children = [this, &scalar_values](BuilderPtr child, + std::shared_ptr<Array>* out) { + return Finish(scalar_values, child, out); + }; + switch (builder.kind) { + case Kind::kNull: { + auto length = static_cast<int64_t>(builder.index); + *out = std::make_shared<NullArray>(length); + return Status::OK(); + } + case Kind::kBoolean: + return Cast<Kind::kBoolean>(builder)->Finish(out); + + case Kind::kNumber: + return FinishScalar(scalar_values, Cast<Kind::kNumber>(builder), out); + + case Kind::kString: + return FinishScalar(scalar_values, Cast<Kind::kString>(builder), out); + + case Kind::kArray: + return Cast<Kind::kArray>(builder)->Finish(std::move(finish_children), out); + + case Kind::kObject: + return Cast<Kind::kObject>(builder)->Finish(std::move(finish_children), out); + + default: + return Status::NotImplemented("invalid builder kind"); + } + } + + private: + /// finish a column of scalar values (string or number) + Status FinishScalar(const std::shared_ptr<Array>& scalar_values, ScalarBuilder* builder, + std::shared_ptr<Array>* out) { + std::shared_ptr<Array> indices; + // TODO(bkietz) embed builder->values_length() in this output somehow + RETURN_NOT_OK(builder->Finish(&indices)); + auto ty = dictionary(int32(), scalar_values->type()); + *out = std::make_shared<DictionaryArray>(ty, indices, scalar_values); + return Status::OK(); + } + + template <Kind::type kind> + std::vector<RawArrayBuilder<kind>>& arena() { + return std::get<static_cast<std::size_t>(kind)>(arenas_); + } + + MemoryPool* pool_; + std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>, + std::vector<RawArrayBuilder<Kind::kNumber>>, + std::vector<RawArrayBuilder<Kind::kString>>, + std::vector<RawArrayBuilder<Kind::kArray>>, + std::vector<RawArrayBuilder<Kind::kObject>>> + arenas_; +}; + +/// Three implementations are provided for BlockParser, one for each +/// UnexpectedFieldBehavior. However most of the logic is identical in each +/// case, so the majority of the implementation is in this base class +class HandlerBase : public BlockParser, + public rj::BaseReaderHandler<rj::UTF8<>, HandlerBase> { + public: + explicit HandlerBase(MemoryPool* pool) + : BlockParser(pool), + builder_set_(pool), + field_index_(-1), + scalar_values_builder_(pool) {} + + /// Retrieve a pointer to a builder from a BuilderPtr + template <Kind::type kind> + enable_if_t<kind != Kind::kNull, RawArrayBuilder<kind>*> Cast(BuilderPtr builder) { + return builder_set_.Cast<kind>(builder); + } + + /// Accessor for a stored error Status + Status Error() { return status_; } + + /// \defgroup rapidjson-handler-interface functions expected by rj::Reader + /// + /// bool Key(const char* data, rj::SizeType size, ...) is omitted since + /// the behavior varies greatly between UnexpectedFieldBehaviors + /// + /// @{ + bool Null() { + status_ = builder_set_.AppendNull(builder_stack_.back(), field_index_, builder_); + return status_.ok(); + } + + bool Bool(bool value) { + constexpr auto kind = Kind::kBoolean; + if (ARROW_PREDICT_FALSE(builder_.kind != kind)) { + status_ = IllegallyChangedTo(kind); + return status_.ok(); + } + status_ = Cast<kind>(builder_)->Append(value); + return status_.ok(); + } + + bool RawNumber(const char* data, rj::SizeType size, ...) { + status_ = AppendScalar<Kind::kNumber>(builder_, string_view(data, size)); + return status_.ok(); + } + + bool String(const char* data, rj::SizeType size, ...) { + status_ = AppendScalar<Kind::kString>(builder_, string_view(data, size)); + return status_.ok(); + } + + bool StartObject() { + status_ = StartObjectImpl(); + return status_.ok(); + } + + bool EndObject(...) { + status_ = EndObjectImpl(); + return status_.ok(); + } + + bool StartArray() { + status_ = StartArrayImpl(); + return status_.ok(); + } + + bool EndArray(rj::SizeType size) { + status_ = EndArrayImpl(size); + return status_.ok(); + } + /// @} + + /// \brief Set up builders using an expected Schema + Status Initialize(const std::shared_ptr<Schema>& s) { + auto type = struct_({}); + if (s) { + type = struct_(s->fields()); + } + return builder_set_.MakeBuilder(*type, 0, &builder_); + } + + Status Finish(std::shared_ptr<Array>* parsed) override { + std::shared_ptr<Array> scalar_values; + RETURN_NOT_OK(scalar_values_builder_.Finish(&scalar_values)); + return builder_set_.Finish(scalar_values, builder_, parsed); + } + + /// \brief Emit path of current field for debugging purposes + std::string Path() { + std::string path; + for (size_t i = 0; i < builder_stack_.size(); ++i) { + auto builder = builder_stack_[i]; + if (builder.kind == Kind::kArray) { + path += "/[]"; + } else { + auto struct_builder = Cast<Kind::kObject>(builder); + auto field_index = field_index_; + if (i + 1 < field_index_stack_.size()) { + field_index = field_index_stack_[i + 1]; + } + path += "/" + struct_builder->FieldName(field_index); + } + } + return path; + } + + protected: + template <typename Handler, typename Stream> + Status DoParse(Handler& handler, Stream&& json) { + constexpr auto parse_flags = rj::kParseIterativeFlag | rj::kParseNanAndInfFlag | + rj::kParseStopWhenDoneFlag | + rj::kParseNumbersAsStringsFlag; + + rj::Reader reader; + + for (; num_rows_ < kMaxParserNumRows; ++num_rows_) { + auto ok = reader.Parse<parse_flags>(json, handler); + switch (ok.Code()) { + case rj::kParseErrorNone: + // parse the next object + continue; + case rj::kParseErrorDocumentEmpty: + // parsed all objects, finish + return Status::OK(); + case rj::kParseErrorTermination: + // handler emitted an error + return handler.Error(); + default: + // rj emitted an error + return ParseError(rj::GetParseError_En(ok.Code()), " in row ", num_rows_); + } + } + return Status::Invalid("Exceeded maximum rows"); + } + + template <typename Handler> + Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) { + RETURN_NOT_OK(ReserveScalarStorage(json->size())); + rj::MemoryStream ms(reinterpret_cast<const char*>(json->data()), json->size()); + using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>; + return DoParse(handler, InputStream(ms)); + } + + /// \defgroup handlerbase-append-methods append non-nested values + /// + /// @{ + + template <Kind::type kind> + Status AppendScalar(BuilderPtr builder, string_view scalar) { + if (ARROW_PREDICT_FALSE(builder.kind != kind)) { + return IllegallyChangedTo(kind); + } + auto index = static_cast<int32_t>(scalar_values_builder_.length()); + auto value_length = static_cast<int32_t>(scalar.size()); + RETURN_NOT_OK(Cast<kind>(builder)->Append(index, value_length)); + RETURN_NOT_OK(scalar_values_builder_.Reserve(1)); + scalar_values_builder_.UnsafeAppend(scalar); + return Status::OK(); + } + + /// @} + + Status StartObjectImpl() { + constexpr auto kind = Kind::kObject; + if (ARROW_PREDICT_FALSE(builder_.kind != kind)) { + return IllegallyChangedTo(kind); + } + auto struct_builder = Cast<kind>(builder_); + absent_fields_stack_.Push(struct_builder->num_fields(), true); + StartNested(); + return struct_builder->Append(); + } + + /// \brief helper for Key() functions + /// + /// sets the field builder with name key, or returns false if + /// there is no field with that name + bool SetFieldBuilder(string_view key, bool* duplicate_keys) { + auto parent = Cast<Kind::kObject>(builder_stack_.back()); + field_index_ = parent->GetFieldIndex(std::string(key)); + if (ARROW_PREDICT_FALSE(field_index_ == -1)) { + return false; + } + *duplicate_keys = !absent_fields_stack_[field_index_]; + if (*duplicate_keys) { + status_ = ParseError("Column(", Path(), ") was specified twice in row ", num_rows_); + return false; + } + builder_ = parent->field_builder(field_index_); + absent_fields_stack_[field_index_] = false; + return true; + } + + Status EndObjectImpl() { + auto parent = builder_stack_.back(); + + auto expected_count = absent_fields_stack_.TopSize(); + for (int i = 0; i < expected_count; ++i) { + if (!absent_fields_stack_[i]) { + continue; + } + auto field_builder = Cast<Kind::kObject>(parent)->field_builder(i); + if (ARROW_PREDICT_FALSE(!field_builder.nullable)) { + return ParseError("a required field was absent"); + } + RETURN_NOT_OK(builder_set_.AppendNull(parent, i, field_builder)); + } + absent_fields_stack_.Pop(); + EndNested(); + return Status::OK(); + } + + Status StartArrayImpl() { + constexpr auto kind = Kind::kArray; + if (ARROW_PREDICT_FALSE(builder_.kind != kind)) { + return IllegallyChangedTo(kind); + } + StartNested(); + // append to the list builder in EndArrayImpl + builder_ = Cast<kind>(builder_)->value_builder(); + return Status::OK(); + } + + Status EndArrayImpl(rj::SizeType size) { + EndNested(); + // append to list_builder here + auto list_builder = Cast<Kind::kArray>(builder_); + return list_builder->Append(size); + } + + /// helper method for StartArray and StartObject + /// adds the current builder to a stack so its + /// children can be visited and parsed. + void StartNested() { + field_index_stack_.push_back(field_index_); + field_index_ = -1; + builder_stack_.push_back(builder_); + } + + /// helper method for EndArray and EndObject + /// replaces the current builder with its parent + /// so parsing of the parent can continue + void EndNested() { + field_index_ = field_index_stack_.back(); + field_index_stack_.pop_back(); + builder_ = builder_stack_.back(); + builder_stack_.pop_back(); + } + + Status IllegallyChangedTo(Kind::type illegally_changed_to) { + return ParseError("Column(", Path(), ") changed from ", Kind::Name(builder_.kind), + " to ", Kind::Name(illegally_changed_to), " in row ", num_rows_); + } + + /// Reserve storage for scalars, these can occupy almost all of the JSON buffer + Status ReserveScalarStorage(int64_t size) override { + auto available_storage = scalar_values_builder_.value_data_capacity() - + scalar_values_builder_.value_data_length(); + if (size <= available_storage) { + return Status::OK(); + } + return scalar_values_builder_.ReserveData(size - available_storage); + } + + Status status_; + RawBuilderSet builder_set_; + BuilderPtr builder_; + // top of this stack is the parent of builder_ + std::vector<BuilderPtr> builder_stack_; + // top of this stack refers to the fields of the highest *StructBuilder* + // in builder_stack_ (list builders don't have absent fields) + BitsetStack absent_fields_stack_; + // index of builder_ within its parent + int field_index_; + // top of this stack == field_index_ + std::vector<int> field_index_stack_; + StringBuilder scalar_values_builder_; +}; + +template <UnexpectedFieldBehavior> +class Handler; + +template <> +class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase { + public: + using HandlerBase::HandlerBase; + + Status Parse(const std::shared_ptr<Buffer>& json) override { + return DoParse(*this, json); + } + + /// \ingroup rapidjson-handler-interface + /// + /// if an unexpected field is encountered, emit a parse error and bail + bool Key(const char* key, rj::SizeType len, ...) { + bool duplicate_keys = false; + if (ARROW_PREDICT_FALSE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) { + return true; + } + if (!duplicate_keys) { + status_ = ParseError("unexpected field"); + } + return false; + } +}; + +template <> +class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase { + public: + using HandlerBase::HandlerBase; + + Status Parse(const std::shared_ptr<Buffer>& json) override { + return DoParse(*this, json); + } + + bool Null() { + if (Skipping()) { + return true; + } + return HandlerBase::Null(); + } + + bool Bool(bool value) { + if (Skipping()) { + return true; + } + return HandlerBase::Bool(value); + } + + bool RawNumber(const char* data, rj::SizeType size, ...) { + if (Skipping()) { + return true; + } + return HandlerBase::RawNumber(data, size); + } + + bool String(const char* data, rj::SizeType size, ...) { + if (Skipping()) { + return true; + } + return HandlerBase::String(data, size); + } + + bool StartObject() { + ++depth_; + if (Skipping()) { + return true; + } + return HandlerBase::StartObject(); + } + + /// \ingroup rapidjson-handler-interface + /// + /// if an unexpected field is encountered, skip until its value has been consumed + bool Key(const char* key, rj::SizeType len, ...) { + MaybeStopSkipping(); + if (Skipping()) { + return true; + } + bool duplicate_keys = false; + if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) { + return true; + } + if (ARROW_PREDICT_FALSE(duplicate_keys)) { + return false; + } + skip_depth_ = depth_; + return true; + } + + bool EndObject(...) { + MaybeStopSkipping(); + --depth_; + if (Skipping()) { + return true; + } + return HandlerBase::EndObject(); + } + + bool StartArray() { + if (Skipping()) { + return true; + } + return HandlerBase::StartArray(); + } + + bool EndArray(rj::SizeType size) { + if (Skipping()) { + return true; + } + return HandlerBase::EndArray(size); + } + + private: + bool Skipping() { return depth_ >= skip_depth_; } + + void MaybeStopSkipping() { + if (skip_depth_ == depth_) { + skip_depth_ = std::numeric_limits<int>::max(); + } + } + + int depth_ = 0; + int skip_depth_ = std::numeric_limits<int>::max(); +}; + +template <> +class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase { + public: + using HandlerBase::HandlerBase; + + Status Parse(const std::shared_ptr<Buffer>& json) override { + return DoParse(*this, json); + } + + bool Bool(bool value) { + if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kBoolean>())) { + return false; + } + return HandlerBase::Bool(value); + } + + bool RawNumber(const char* data, rj::SizeType size, ...) { + if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) { + return false; + } + return HandlerBase::RawNumber(data, size); + } + + bool String(const char* data, rj::SizeType size, ...) { + if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) { + return false; + } + return HandlerBase::String(data, size); + } + + bool StartObject() { + if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kObject>())) { + return false; + } + return HandlerBase::StartObject(); + } + + /// \ingroup rapidjson-handler-interface + /// + /// If an unexpected field is encountered, add a new builder to + /// the current parent builder. It is added as a NullBuilder with + /// (parent.length - 1) leading nulls. The next value parsed + /// will probably trigger promotion of this field from null + bool Key(const char* key, rj::SizeType len, ...) { + bool duplicate_keys = false; + if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len), &duplicate_keys))) { + return true; + } + if (ARROW_PREDICT_FALSE(duplicate_keys)) { + return false; + } + auto struct_builder = Cast<Kind::kObject>(builder_stack_.back()); + auto leading_nulls = static_cast<uint32_t>(struct_builder->length() - 1); + builder_ = BuilderPtr(Kind::kNull, leading_nulls, true); + field_index_ = struct_builder->AddField(std::string(key, len), builder_); + return true; + } + + bool StartArray() { + if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kArray>())) { + return false; + } + return HandlerBase::StartArray(); + } + + private: + // return true if a terminal error was encountered + template <Kind::type kind> + bool MaybePromoteFromNull() { + if (ARROW_PREDICT_TRUE(builder_.kind != Kind::kNull)) { + return false; + } + auto parent = builder_stack_.back(); + if (parent.kind == Kind::kArray) { + auto list_builder = Cast<Kind::kArray>(parent); + DCHECK_EQ(list_builder->value_builder(), builder_); + status_ = builder_set_.MakeBuilder<kind>(builder_.index, &builder_); + if (ARROW_PREDICT_FALSE(!status_.ok())) { + return true; + } + list_builder = Cast<Kind::kArray>(parent); + list_builder->value_builder(builder_); + } else { + auto struct_builder = Cast<Kind::kObject>(parent); + DCHECK_EQ(struct_builder->field_builder(field_index_), builder_); + status_ = builder_set_.MakeBuilder<kind>(builder_.index, &builder_); + if (ARROW_PREDICT_FALSE(!status_.ok())) { + return true; + } + struct_builder = Cast<Kind::kObject>(parent); + struct_builder->field_builder(field_index_, builder_); + } + return false; + } +}; + +Status BlockParser::Make(MemoryPool* pool, const ParseOptions& options, + std::unique_ptr<BlockParser>* out) { + DCHECK(options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType || + options.explicit_schema != nullptr); + + switch (options.unexpected_field_behavior) { + case UnexpectedFieldBehavior::Ignore: { + *out = make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(pool); + break; + } + case UnexpectedFieldBehavior::Error: { + *out = make_unique<Handler<UnexpectedFieldBehavior::Error>>(pool); + break; + } + case UnexpectedFieldBehavior::InferType: + *out = make_unique<Handler<UnexpectedFieldBehavior::InferType>>(pool); + break; + } + return static_cast<HandlerBase&>(**out).Initialize(options.explicit_schema); +} + +Status BlockParser::Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out) { + return BlockParser::Make(default_memory_pool(), options, out); +} + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h new file mode 100644 index 00000000000..4dd14e4b80c --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/parser.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> + +#include "arrow/json/options.h" +#include "arrow/status.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +class MemoryPool; +class KeyValueMetadata; +class ResizableBuffer; + +namespace json { + +struct Kind { + enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject }; + + static const std::string& Name(Kind::type); + + static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type); + + static Kind::type FromTag(const std::shared_ptr<const KeyValueMetadata>& tag); + + static Status ForType(const DataType& type, Kind::type* kind); +}; + +constexpr int32_t kMaxParserNumRows = 100000; + +/// \class BlockParser +/// \brief A reusable block-based parser for JSON data +/// +/// The parser takes a block of newline delimited JSON data and extracts Arrays +/// of unconverted strings which can be fed to a Converter to obtain a usable Array. +/// +/// Note that in addition to parse errors (such as malformed JSON) some conversion +/// errors are caught at parse time: +/// - A null value in non-nullable column +/// - Change in the JSON kind of a column. For example, if an explicit schema is provided +/// which stipulates that field "a" is integral, a row of {"a": "not a number"} will +/// result in an error. This also applies to fields outside an explicit schema. +class ARROW_EXPORT BlockParser { + public: + virtual ~BlockParser() = default; + + /// \brief Reserve storage for scalars parsed from a block of json + virtual Status ReserveScalarStorage(int64_t nbytes) = 0; + + /// \brief Parse a block of data + virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0; + + /// \brief Extract parsed data + virtual Status Finish(std::shared_ptr<Array>* parsed) = 0; + + /// \brief Return the number of parsed rows + int32_t num_rows() const { return num_rows_; } + + /// \brief Construct a BlockParser + /// + /// \param[in] pool MemoryPool to use when constructing parsed array + /// \param[in] options ParseOptions to use when parsing JSON + /// \param[out] out constructed BlockParser + static Status Make(MemoryPool* pool, const ParseOptions& options, + std::unique_ptr<BlockParser>* out); + + static Status Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out); + + protected: + ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser); + + explicit BlockParser(MemoryPool* pool) : pool_(pool) {} + + MemoryPool* pool_; + int32_t num_rows_ = 0; +}; + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h new file mode 100644 index 00000000000..9ed81d000c5 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/rapidjson_defs.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Include this file before including any RapidJSON headers. + +#pragma once + +#define RAPIDJSON_HAS_STDSTRING 1 +#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1 +#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1 + +// rapidjson will be defined in namespace arrow::rapidjson +#define RAPIDJSON_NAMESPACE arrow::rapidjson +#define RAPIDJSON_NAMESPACE_BEGIN \ + namespace arrow { \ + namespace rapidjson { +#define RAPIDJSON_NAMESPACE_END \ + } \ + } + +// enable SIMD whitespace skipping, if available +#if defined(ARROW_HAVE_SSE4_2) +#define RAPIDJSON_SSE2 1 +#define RAPIDJSON_SSE42 1 +#endif + +#if defined(ARROW_HAVE_NEON) +#define RAPIDJSON_NEON 1 +#endif diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc new file mode 100644 index 00000000000..51c77fa4df9 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.cc @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/reader.h" + +#include <utility> +#include <vector> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/json/chunked_builder.h" +#include "arrow/json/chunker.h" +#include "arrow/json/converter.h" +#include "arrow/json/parser.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" +#include "arrow/util/task_group.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using util::string_view; + +using internal::checked_cast; +using internal::GetCpuThreadPool; +using internal::TaskGroup; +using internal::ThreadPool; + +namespace json { + +class TableReaderImpl : public TableReader, + public std::enable_shared_from_this<TableReaderImpl> { + public: + TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options, + const ParseOptions& parse_options, + std::shared_ptr<TaskGroup> task_group) + : pool_(pool), + read_options_(read_options), + parse_options_(parse_options), + chunker_(MakeChunker(parse_options_)), + task_group_(std::move(task_group)) {} + + Status Init(std::shared_ptr<io::InputStream> input) { + ARROW_ASSIGN_OR_RAISE(auto it, + io::MakeInputStreamIterator(input, read_options_.block_size)); + return MakeReadaheadIterator(std::move(it), task_group_->parallelism()) + .Value(&block_iterator_); + } + + Result<std::shared_ptr<Table>> Read() override { + RETURN_NOT_OK(MakeBuilder()); + + ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next()); + if (block == nullptr) { + return Status::Invalid("Empty JSON file"); + } + + auto self = shared_from_this(); + auto empty = std::make_shared<Buffer>(""); + + int64_t block_index = 0; + std::shared_ptr<Buffer> partial = empty; + + while (block != nullptr) { + std::shared_ptr<Buffer> next_block, whole, completion, next_partial; + + ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next()); + + if (next_block == nullptr) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole)); + } else { + std::shared_ptr<Buffer> starts_with_whole; + // Get completion of partial from previous block. + RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion, + &starts_with_whole)); + + // Get all whole objects entirely inside the current buffer + RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); + } + + // Launch parse task + task_group_->Append([self, partial, completion, whole, block_index] { + return self->ParseAndInsert(partial, completion, whole, block_index); + }); + block_index++; + + partial = next_partial; + block = next_block; + } + + std::shared_ptr<ChunkedArray> array; + RETURN_NOT_OK(builder_->Finish(&array)); + return Table::FromChunkedStructArray(array); + } + + private: + Status MakeBuilder() { + auto type = parse_options_.explicit_schema + ? struct_(parse_options_.explicit_schema->fields()) + : struct_({}); + + auto promotion_graph = + parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType + ? GetPromotionGraph() + : nullptr; + + return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_); + } + + Status ParseAndInsert(const std::shared_ptr<Buffer>& partial, + const std::shared_ptr<Buffer>& completion, + const std::shared_ptr<Buffer>& whole, int64_t block_index) { + std::unique_ptr<BlockParser> parser; + RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser)); + RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() + + whole->size())); + + if (partial->size() != 0 || completion->size() != 0) { + std::shared_ptr<Buffer> straddling; + if (partial->size() == 0) { + straddling = completion; + } else if (completion->size() == 0) { + straddling = partial; + } else { + ARROW_ASSIGN_OR_RAISE(straddling, + ConcatenateBuffers({partial, completion}, pool_)); + } + RETURN_NOT_OK(parser->Parse(straddling)); + } + + if (whole->size() != 0) { + RETURN_NOT_OK(parser->Parse(whole)); + } + + std::shared_ptr<Array> parsed; + RETURN_NOT_OK(parser->Finish(&parsed)); + builder_->Insert(block_index, field("", parsed->type()), parsed); + return Status::OK(); + } + + MemoryPool* pool_; + ReadOptions read_options_; + ParseOptions parse_options_; + std::unique_ptr<Chunker> chunker_; + std::shared_ptr<TaskGroup> task_group_; + Iterator<std::shared_ptr<Buffer>> block_iterator_; + std::shared_ptr<ChunkedArrayBuilder> builder_; +}; + +Status TableReader::Read(std::shared_ptr<Table>* out) { return Read().Value(out); } + +Result<std::shared_ptr<TableReader>> TableReader::Make( + MemoryPool* pool, std::shared_ptr<io::InputStream> input, + const ReadOptions& read_options, const ParseOptions& parse_options) { + std::shared_ptr<TableReaderImpl> ptr; + if (read_options.use_threads) { + ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options, + TaskGroup::MakeThreaded(GetCpuThreadPool())); + } else { + ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options, + TaskGroup::MakeSerial()); + } + RETURN_NOT_OK(ptr->Init(input)); + return ptr; +} + +Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input, + const ReadOptions& read_options, + const ParseOptions& parse_options, + std::shared_ptr<TableReader>* out) { + return TableReader::Make(pool, input, read_options, parse_options).Value(out); +} + +Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options, + std::shared_ptr<Buffer> json) { + std::unique_ptr<BlockParser> parser; + RETURN_NOT_OK(BlockParser::Make(options, &parser)); + RETURN_NOT_OK(parser->Parse(json)); + std::shared_ptr<Array> parsed; + RETURN_NOT_OK(parser->Finish(&parsed)); + + auto type = + options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({}); + auto promotion_graph = + options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType + ? GetPromotionGraph() + : nullptr; + std::shared_ptr<ChunkedArrayBuilder> builder; + RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(), + promotion_graph, type, &builder)); + + builder->Insert(0, field("", type), parsed); + std::shared_ptr<ChunkedArray> converted_chunked; + RETURN_NOT_OK(builder->Finish(&converted_chunked)); + const auto& converted = checked_cast<const StructArray&>(*converted_chunked->chunk(0)); + + std::vector<std::shared_ptr<Array>> columns(converted.num_fields()); + for (int i = 0; i < converted.num_fields(); ++i) { + columns[i] = converted.field(i); + } + return RecordBatch::Make(schema(converted.type()->fields()), converted.length(), + std::move(columns)); +} + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h new file mode 100644 index 00000000000..c40338c1e1c --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/reader.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "arrow/json/options.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class MemoryPool; +class Table; +class RecordBatch; +class Array; +class DataType; + +namespace io { +class InputStream; +} // namespace io + +namespace json { + +/// A class that reads an entire JSON file into a Arrow Table +/// +/// The file is expected to consist of individual line-separated JSON objects +class ARROW_EXPORT TableReader { + public: + virtual ~TableReader() = default; + + /// Read the entire JSON file and convert it to a Arrow Table + virtual Result<std::shared_ptr<Table>> Read() = 0; + + ARROW_DEPRECATED("Use Result-returning version") + Status Read(std::shared_ptr<Table>* out); + + /// Create a TableReader instance + static Result<std::shared_ptr<TableReader>> Make(MemoryPool* pool, + std::shared_ptr<io::InputStream> input, + const ReadOptions&, + const ParseOptions&); + + ARROW_DEPRECATED("Use Result-returning version") + static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input, + const ReadOptions&, const ParseOptions&, + std::shared_ptr<TableReader>* out); +}; + +ARROW_EXPORT Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options, + std::shared_ptr<Buffer> json); + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h b/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h new file mode 100644 index 00000000000..67e2e1bb406 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/json/type_fwd.h @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace arrow { +namespace json { + +class TableReader; +struct ReadOptions; +struct ParseOptions; + +} // namespace json +} // namespace arrow diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h b/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h new file mode 100644 index 00000000000..addded94943 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/util/bitset_stack.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <array> +#include <bitset> +#include <cassert> +#include <cstdint> +#include <cstring> +#include <memory> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/compare.h" +#include "arrow/util/functional.h" +#include "arrow/util/macros.h" +#include "arrow/util/string_builder.h" +#include "arrow/util/string_view.h" +#include "arrow/util/type_traits.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace internal { + +/// \brief Store a stack of bitsets efficiently. The top bitset may be +/// accessed and its bits may be modified, but it may not be resized. +class BitsetStack { + public: + using reference = typename std::vector<bool>::reference; + + /// \brief push a bitset onto the stack + /// \param size number of bits in the next bitset + /// \param value initial value for bits in the pushed bitset + void Push(int size, bool value) { + offsets_.push_back(bit_count()); + bits_.resize(bit_count() + size, value); + } + + /// \brief number of bits in the bitset at the top of the stack + int TopSize() const { + if (offsets_.size() == 0) return 0; + return bit_count() - offsets_.back(); + } + + /// \brief pop a bitset off the stack + void Pop() { + bits_.resize(offsets_.back()); + offsets_.pop_back(); + } + + /// \brief get the value of a bit in the top bitset + /// \param i index of the bit to access + bool operator[](int i) const { return bits_[offsets_.back() + i]; } + + /// \brief get a mutable reference to a bit in the top bitset + /// \param i index of the bit to access + reference operator[](int i) { return bits_[offsets_.back() + i]; } + + private: + int bit_count() const { return static_cast<int>(bits_.size()); } + std::vector<bool> bits_; + std::vector<int> offsets_; +}; + +} // namespace internal +} // namespace arrow |