diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-04-02 17:56:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-02 17:56:56 +0300 |
commit | 4ea7112b62176d32d08aac4b642981c58a30046e (patch) | |
tree | 93efc5c18e992fee7d70c91355d131c8a3fe3fec | |
parent | e3295b3556127786c9f4eb6ebc71f1036660b1a7 (diff) | |
download | ydb-4ea7112b62176d32d08aac4b642981c58a30046e.tar.gz |
dont use paddings strings in case a lot of further data in parsing st… (#16660)
3 files changed, 62 insertions, 18 deletions
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp index abab451ebd8..0bbf7abb273 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp @@ -10,13 +10,39 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns { -TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders( - const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const { +class TSimdBuffers: public TDataBuilder::IBuffers { +private: + std::vector<simdjson::padded_string> PaddedStrings; + std::vector<TString> Strings; + +public: + TSimdBuffers(std::vector<simdjson::padded_string>&& paddedStrings, std::vector<TString>&& strings) + : PaddedStrings(std::move(paddedStrings)) + , Strings(std::move(strings)) { + } +}; + +TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const { auto arr = std::static_pointer_cast<arrow::BinaryArray>(sourceArray); std::optional<bool> isBinaryJson; if (arr->type()->id() == arrow::utf8()->id()) { isBinaryJson = false; } + if (!arr->length()) { + return TConclusionStatus::Success(); + } + simdjson::ondemand::parser simdParser; + std::vector<simdjson::padded_string> paddedStrings; + std::vector<TString> forceSIMDStrings; + ui32 sumBuf = 0; + ui32 paddedBorder = 0; + for (i32 i = arr->length() - 1; i >= 1; --i) { + sumBuf += arr->GetView(i).size(); + if (sumBuf > simdjson::SIMDJSON_PADDING) { + paddedBorder = i; + break; + } + } for (ui32 i = 0; i < arr->length(); ++i) { const auto view = arr->GetView(i); if (view.size() && !arr->IsNull(i)) { @@ -27,6 +53,7 @@ TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders( TString json; if (*isBinaryJson && ForceSIMDJsonParsing) { json = NBinaryJson::SerializeToJson(sbJson); + forceSIMDStrings.emplace_back(json); sbJson = TStringBuf(json.data(), json.size()); } if (!json && *isBinaryJson) { @@ -47,7 +74,14 @@ TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders( } } else { std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators; - auto doc = dataBuilder.ParseJsonOnDemand(sbJson); + simdjson::simdjson_result<simdjson::ondemand::document> doc; + if (i < paddedBorder) { + doc = simdParser.iterate( + simdjson::padded_string_view(sbJson.data(), sbJson.size(), sbJson.size() + simdjson::SIMDJSON_PADDING)); + } else { + paddedStrings.emplace_back(simdjson::padded_string(sbJson.data(), sbJson.size())); + doc = simdParser.iterate(paddedStrings.back()); + } auto conclusion = TSIMDExtractor(doc, FirstLevelOnly).Fill(dataBuilder, iterators); if (conclusion.IsFail()) { return conclusion; @@ -56,6 +90,9 @@ TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders( } dataBuilder.StartNextRecord(); } + if (paddedStrings.size()) { + dataBuilder.StoreBuffer(std::make_shared<TSimdBuffers>(std::move(paddedStrings), std::move(forceSIMDStrings))); + } return TConclusionStatus::Success(); } diff --git a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp index 9b7dbc192c4..5b858f8787a 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp @@ -5,6 +5,13 @@ #include <ydb/core/formats/arrow/accessor/plain/accessor.h> #include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> +#include <contrib/libs/simdjson/include/simdjson/dom/array-inl.h> +#include <contrib/libs/simdjson/include/simdjson/dom/document-inl.h> +#include <contrib/libs/simdjson/include/simdjson/dom/element-inl.h> +#include <contrib/libs/simdjson/include/simdjson/dom/object-inl.h> +#include <contrib/libs/simdjson/include/simdjson/dom/parser-inl.h> +#include <contrib/libs/simdjson/include/simdjson/ondemand.h> + namespace NKikimr::NArrow::NAccessor::NSubColumns { void TColumnElements::BuildSparsedAccessor(const ui32 recordsCount) { @@ -142,4 +149,9 @@ TStringBuf TDataBuilder::AddKey(const TStringBuf currentPrefix, const TStringBuf return TStringBuf(it->second.data(), it->second.size()); } +TDataBuilder::TDataBuilder(const std::shared_ptr<arrow::DataType>& type, const TSettings& settings) + : Type(type) + , Settings(settings) { +} + } // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h index 74da890a79e..59a1d9ea93d 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h @@ -7,12 +7,6 @@ #include <ydb/core/formats/arrow/arrow_helpers.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> -#include <contrib/libs/simdjson/include/simdjson/dom/array-inl.h> -#include <contrib/libs/simdjson/include/simdjson/dom/document-inl.h> -#include <contrib/libs/simdjson/include/simdjson/dom/element-inl.h> -#include <contrib/libs/simdjson/include/simdjson/dom/object-inl.h> -#include <contrib/libs/simdjson/include/simdjson/dom/parser-inl.h> -#include <contrib/libs/simdjson/include/simdjson/ondemand.h> #include <contrib/libs/xxhash/xxhash.h> #include <util/string/join.h> @@ -52,6 +46,12 @@ public: }; class TDataBuilder { +public: + class IBuffers { + public: + virtual ~IBuffers() = default; + }; + private: class TStorageAddress { private: @@ -82,18 +82,13 @@ private: std::deque<TString> StorageStrings; const std::shared_ptr<arrow::DataType> Type; const TSettings Settings; - std::deque<simdjson::padded_string> PaddedStrings; - simdjson::ondemand::parser Parser; + std::vector<std::shared_ptr<IBuffers>> Buffers; public: - TDataBuilder(const std::shared_ptr<arrow::DataType>& type, const TSettings& settings) - : Type(type) - , Settings(settings) { - } + TDataBuilder(const std::shared_ptr<arrow::DataType>& type, const TSettings& settings); - simdjson::simdjson_result<simdjson::ondemand::document> ParseJsonOnDemand(const TStringBuf sv) { - PaddedStrings.emplace_back(simdjson::padded_string(sv.data(), sv.size())); - return Parser.iterate(PaddedStrings.back()); + void StoreBuffer(const std::shared_ptr<IBuffers>& data) { + Buffers.emplace_back(data); } void StartNextRecord() { |