diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-02-15 14:13:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-15 14:13:19 +0300 |
commit | 0d2f9d1286625ae67acf70f837ad6f856853ee04 (patch) | |
tree | a6fcd2fbf54130d590d44205b03bb5f4a10d4ea3 | |
parent | 709b41317cbba34d1204f3b80146db8e684310be (diff) | |
download | ydb-0d2f9d1286625ae67acf70f837ad6f856853ee04.tar.gz |
special json accessors (#13933)
write json as subcolumns accessor for fast way select columns data
102 files changed, 4292 insertions, 670 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 7996d68465..36c437d03a 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -28,6 +28,8 @@ ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare ydb/core/kqp/ut/olap KqpOlap.TableSinkWithOlapStore +ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization +ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1 ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts diff --git a/ydb/core/formats/arrow/accessor/abstract/constructor.h b/ydb/core/formats/arrow/accessor/abstract/constructor.h index 8cbef70a2a..c8c4aeb344 100644 --- a/ydb/core/formats/arrow/accessor/abstract/constructor.h +++ b/ydb/core/formats/arrow/accessor/abstract/constructor.h @@ -1,8 +1,8 @@ #pragma once -#include <ydb/library/formats/arrow/protos/accessor.pb.h> #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> #include <ydb/library/formats/arrow/accessor/common/chunk_data.h> +#include <ydb/library/formats/arrow/protos/accessor.pb.h> #include <ydb/services/bg_tasks/abstract/interface.h> #include <library/cpp/object_factory/object_factory.h> @@ -15,27 +15,33 @@ public: using TProto = NKikimrArrowAccessorProto::TConstructor; private: - virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const = 0; - virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstructDefault( - const TChunkConstructionData& externalInfo) const = 0; + YDB_READONLY(IChunkedArray::EType, Type, IChunkedArray::EType::Undefined); + + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const = 0; + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstructDefault(const TChunkConstructionData& externalInfo) const = 0; virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const = 0; virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) = 0; - virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const = 0; virtual TString DoDebugString() const { return ""; } virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0; - virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( - const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0; + virtual TString DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0; + + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstruct( + const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const = 0; public: + IConstructor(const IChunkedArray::EType type) + : Type(type) { + } + virtual ~IConstructor() = default; - std::shared_ptr<arrow::RecordBatch> Construct( - const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { AFL_VERIFY(columnData); - return DoConstruct(columnData, externalInfo); + AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type); + return DoSerializeToString(columnData, externalInfo); } bool IsEqualWithSameTypeTo(const IConstructor& item) const { @@ -46,15 +52,25 @@ public: return TStringBuilder() << GetClassName() << ":" << DoDebugString(); } - TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Construct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const { - return DoConstruct(originalData, externalInfo); + TConclusion<std::shared_ptr<IChunkedArray>> DeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const { + return DoDeserializeFromString(originalData, externalInfo); } - TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> ConstructDefault(const TChunkConstructionData& externalInfo) const { + TConclusion<std::shared_ptr<IChunkedArray>> ConstructDefault(const TChunkConstructionData& externalInfo) const { return DoConstructDefault(externalInfo); } + TConclusion<std::shared_ptr<IChunkedArray>> Construct( + const std::shared_ptr<IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(originalArray); + if (originalArray->GetType() == GetType()) { + return originalArray; + } else { + return DoConstruct(originalArray, externalInfo); + } + } + bool DeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) { return DoDeserializeFromProto(proto); } @@ -67,17 +83,13 @@ public: proto = DoSerializeToProto(); } - std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const { - AFL_VERIFY(resultColumn); - return DoGetExpectedSchema(resultColumn); - } - virtual TString GetClassName() const = 0; }; class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> { private: using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>; + public: using TBase::TBase; @@ -94,9 +106,15 @@ public: } } - std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const { + TString SerializeToString(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(!!GetObjectPtr()); + return GetObjectPtr()->SerializeToString(batch, externalInfo); + } + + TConclusion<std::shared_ptr<IChunkedArray>> DeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const { AFL_VERIFY(!!GetObjectPtr()); - return GetObjectPtr()->Construct(batch, externalInfo); + return GetObjectPtr()->DeserializeFromString(originalData, externalInfo); } static TConstructorContainer GetDefaultConstructor(); diff --git a/ydb/core/formats/arrow/accessor/composite_serial/accessor.h b/ydb/core/formats/arrow/accessor/composite_serial/accessor.h index ac7e0193d7..e2908e1184 100644 --- a/ydb/core/formats/arrow/accessor/composite_serial/accessor.h +++ b/ydb/core/formats/arrow/accessor/composite_serial/accessor.h @@ -1,12 +1,14 @@ #pragma once #include <ydb/core/formats/arrow/save_load/loader.h> + #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/composite/accessor.h> namespace NKikimr::NArrow::NAccessor { -class TDeserializeChunkedArray: public NArrow::NAccessor::IChunkedArray { +class TDeserializeChunkedArray: public ICompositeChunkedArray { private: - using TBase = NArrow::NAccessor::IChunkedArray; + using TBase = ICompositeChunkedArray; public: class TChunk { @@ -40,16 +42,19 @@ private: std::vector<TChunk> Chunks; protected: + virtual ui32 DoGetNullsCount() const override { + AFL_VERIFY(false); + return 0; + } + virtual ui32 DoGetValueRawBytes() const override { + AFL_VERIFY(false); + return 0; + } + virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; - virtual std::vector<TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override { - AFL_VERIFY(false); - return {}; - } - virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override { AFL_VERIFY(false)("problem", "cannot use method"); return nullptr; @@ -61,7 +66,7 @@ protected: AFL_VERIFY(false); return nullptr; } - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override { + virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override { AFL_VERIFY(false); return nullptr; } diff --git a/ydb/core/formats/arrow/accessor/plain/accessor.cpp b/ydb/core/formats/arrow/accessor/plain/accessor.cpp index 0e83b81b09..e2e3cd672d 100644 --- a/ydb/core/formats/arrow/accessor/plain/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/plain/accessor.cpp @@ -1,6 +1,7 @@ #include "accessor.h" #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/save_load/loader.h> #include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/formats/arrow/splitter/simple.h> @@ -10,24 +11,15 @@ std::optional<ui64> TTrivialArray::DoGetRawSize() const { return NArrow::GetArrayDataSize(Array); } -std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TTrivialArray::DoSplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) { - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("f", GetDataType()) })); - auto chunks = NArrow::NSplitter::TSimpleSplitter(saver).SplitBySizes( - arrow::RecordBatch::Make(schema, GetRecordsCount(), { Array }), fullSerializedData, splitSizes); - std::vector<TChunkedArraySerialized> result; - for (auto&& i : chunks) { - AFL_VERIFY(i.GetSlicedBatch()->num_columns() == 1); - result.emplace_back(std::make_shared<TTrivialArray>(i.GetSlicedBatch()->column(0)), i.GetSerializedChunk()); - } - return result; -} - std::shared_ptr<arrow::Scalar> TTrivialArray::DoGetMaxScalar() const { auto minMaxPos = NArrow::FindMinMaxPosition(Array); return NArrow::TStatusValidator::GetValid(Array->GetScalar(minMaxPos.second)); } +ui32 TTrivialArray::DoGetValueRawBytes() const { + return NArrow::GetArrayDataSize(Array); +} + namespace { class TChunkAccessor { private: @@ -85,4 +77,12 @@ std::shared_ptr<arrow::Scalar> TTrivialChunkedArray::DoGetMaxScalar() const { return result; } +ui32 TTrivialChunkedArray::DoGetValueRawBytes() const { + ui32 result = 0; + for (auto&& i : Array->chunks()) { + result += NArrow::GetArrayDataSize(i); + } + return result; +} + } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/plain/accessor.h b/ydb/core/formats/arrow/accessor/plain/accessor.h index 73b8510080..12ad939f39 100644 --- a/ydb/core/formats/arrow/accessor/plain/accessor.h +++ b/ydb/core/formats/arrow/accessor/plain/accessor.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/arrow_helpers.h> #include <ydb/library/formats/arrow/validation/validation.h> namespace NKikimr::NArrow::NAccessor { @@ -12,24 +13,21 @@ private: protected: virtual std::optional<ui64> DoGetRawSize() const override; - virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override { + virtual TLocalDataAddress DoGetLocalData( + const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override { return TLocalDataAddress(Array, 0, 0); } - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override { - return std::make_shared<arrow::ChunkedArray>(Array); - } virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override { return NArrow::TStatusValidator::GetValid(Array->GetScalar(index)); } virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override; - virtual std::vector<TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) override; - - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( - const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override { - AFL_VERIFY(false); - return TLocalChunkedArrayAddress(nullptr, TCommonChunkAddress(0, GetRecordsCount(), 0)); + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override { + return std::make_shared<TTrivialArray>(Array->Slice(offset, count)); + } + virtual ui32 DoGetNullsCount() const override { + return Array->null_count(); } + virtual ui32 DoGetValueRawBytes() const override; public: const std::shared_ptr<arrow::Array>& GetArray() const { @@ -40,6 +38,43 @@ public: : TBase(data->length(), EType::Array, data->type()) , Array(data) { } + + template <class TArrowDataType = arrow::StringType> + class TPlainBuilder { + private: + std::unique_ptr<arrow::ArrayBuilder> Builder; + std::optional<ui32> LastRecordIndex; + + public: + TPlainBuilder(const ui32 reserveItems = 0, const ui32 reserveSize = 0) { + Builder = NArrow::MakeBuilder(arrow::TypeTraits<TArrowDataType>::type_singleton(), reserveItems, reserveSize); + } + + void AddRecord(const ui32 recordIndex, const std::string_view value) { + if (LastRecordIndex) { + AFL_VERIFY(*LastRecordIndex < recordIndex)("last", LastRecordIndex)("index", recordIndex); + TStatusValidator::Validate(Builder->AppendNulls(recordIndex - *LastRecordIndex - 1)); + } else { + TStatusValidator::Validate(Builder->AppendNulls(recordIndex)); + } + LastRecordIndex = recordIndex; + AFL_VERIFY(NArrow::Append<TArrowDataType>(*Builder, arrow::util::string_view(value.data(), value.size()))); + } + + std::shared_ptr<IChunkedArray> Finish(const ui32 recordsCount) { + if (LastRecordIndex) { + AFL_VERIFY(*LastRecordIndex < recordsCount)("last", LastRecordIndex)("count", recordsCount); + TStatusValidator::Validate(Builder->AppendNulls(recordsCount - *LastRecordIndex - 1)); + } else { + TStatusValidator::Validate(Builder->AppendNulls(recordsCount)); + } + return std::make_shared<TTrivialArray>(NArrow::FinishBuilder(std::move(Builder))); + } + }; + + static TPlainBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveSize = 0) { + return TPlainBuilder<arrow::StringType>(reserveItems, reserveSize); + } }; class TTrivialChunkedArray: public IChunkedArray { @@ -48,28 +83,21 @@ private: const std::shared_ptr<arrow::ChunkedArray> Array; protected: - virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override { - return Array; + virtual ui32 DoGetValueRawBytes() const override; + virtual ui32 DoGetNullsCount() const override { + return Array->null_count(); } + virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; virtual std::optional<ui64> DoGetRawSize() const override; virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override { auto chunk = GetChunkSlow(index); return NArrow::TStatusValidator::GetValid(chunk.GetArray()->GetScalar(chunk.GetAddress().GetLocalIndex(index))); } - virtual std::vector<TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override { - AFL_VERIFY(false); - return {}; - } - virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override; - - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( - const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override { - AFL_VERIFY(false); - return TLocalChunkedArrayAddress(nullptr, TCommonChunkAddress(0, 0, 0)); + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override { + return std::make_shared<TTrivialChunkedArray>(Array->Slice(offset, count)); } + virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override; public: TTrivialChunkedArray(const std::shared_ptr<arrow::ChunkedArray>& data) diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.cpp b/ydb/core/formats/arrow/accessor/plain/constructor.cpp index c81b65023f..d73c906911 100644 --- a/ydb/core/formats/arrow/accessor/plain/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/plain/constructor.cpp @@ -1,6 +1,8 @@ #include "accessor.h" #include "constructor.h" +#include <ydb/core/formats/arrow/serializer/abstract.h> + #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> #include <ydb/library/formats/arrow/arrow_helpers.h> #include <ydb/library/formats/arrow/simple_arrays_cache.h> @@ -10,10 +12,17 @@ namespace NKikimr::NArrow::NAccessor::NPlain { -TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& /*externalInfo*/) const { - AFL_VERIFY(originalData->num_columns() == 1)("count", originalData->num_columns())("schema", originalData->schema()->ToString()); - return std::make_shared<NArrow::NAccessor::TTrivialArray>(originalData->column(0)); +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const { + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) })); + auto result = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema); + if (!result.ok()) { + return TConclusionStatus::Fail(result.status().ToString()); + } else { + auto rb = TStatusValidator::GetValid(result); + AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString()); + return std::make_shared<NArrow::NAccessor::TTrivialArray>(rb->column(0)); + } } TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const { @@ -29,21 +38,26 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons return true; } -std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const { - return std::make_shared<arrow::Schema>(arrow::FieldVector({ resultColumn })); -} - -std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct( - const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { +TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) })); + std::shared_ptr<arrow::RecordBatch> rb; if (columnData->GetType() == IChunkedArray::EType::Array) { const auto* arr = static_cast<const TTrivialArray*>(columnData.get()); - return arrow::RecordBatch::Make(schema, columnData->GetRecordsCount(), { arr->GetArray() }); + rb = arrow::RecordBatch::Make(schema, columnData->GetRecordsCount(), { arr->GetArray() }); } else { auto chunked = columnData->GetChunkedArray(); auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount()); - return NArrow::ToBatch(table, chunked->num_chunks() > 1); + rb = NArrow::ToBatch(table); } + return externalInfo.GetDefaultSerializer()->SerializePayload(rb); +} + +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( + const std::shared_ptr<IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const { + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) })); + auto chunked = originalArray->GetChunkedArray(); + auto table = arrow::Table::Make(schema, { chunked }, originalArray->GetRecordsCount()); + return std::make_shared<TTrivialArray>(NArrow::ToBatch(table)->column(0)); } } // namespace NKikimr::NArrow::NAccessor::NPlain diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.h b/ydb/core/formats/arrow/accessor/plain/constructor.h index 243c1e47de..e16f8810d6 100644 --- a/ydb/core/formats/arrow/accessor/plain/constructor.h +++ b/ydb/core/formats/arrow/accessor/plain/constructor.h @@ -1,10 +1,14 @@ #pragma once #include <ydb/core/formats/arrow/accessor/abstract/constructor.h> + #include <ydb/library/formats/arrow/accessor/common/const.h> namespace NKikimr::NArrow::NAccessor::NPlain { class TConstructor: public IConstructor { +private: + using TBase = IConstructor; + public: static TString GetClassNameStatic() { return TGlobalConst::PlainDataAccessorName; @@ -17,16 +21,23 @@ private: return true; } - virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstruct( + const std::shared_ptr<IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const override; + + virtual TString DoSerializeToString( const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override; - virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override; + virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const override; virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override; virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) override; - virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const override; + std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const; virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstructDefault(const TChunkConstructionData& externalInfo) const override; public: + TConstructor() + : TBase(IChunkedArray::EType::Array) { + } + virtual TString GetClassName() const override { return GetClassNameStatic(); } diff --git a/ydb/core/formats/arrow/accessor/sparsed/accessor.cpp b/ydb/core/formats/arrow/accessor/sparsed/accessor.cpp index 62c796b811..d7ee574ccd 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/sparsed/accessor.cpp @@ -1,8 +1,9 @@ #include "accessor.h" +#include <ydb/core/formats/arrow/save_load/loader.h> #include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/formats/arrow/splitter/simple.h> -#include <ydb/core/formats/arrow/save_load/saver.h> + #include <ydb/library/formats/arrow/simple_arrays_cache.h> namespace NKikimr::NArrow::NAccessor { @@ -69,46 +70,6 @@ TSparsedArray::TSparsedArray(const IChunkedArray& defaultArray, const std::share Records.emplace_back(0, GetRecordsCount(), records, DefaultValue); } -std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TSparsedArray::DoSplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) { - AFL_VERIFY(Records.size() == 1)("size", Records.size()); - auto chunks = NArrow::NSplitter::TSimpleSplitter(saver).SplitBySizes(Records.front().GetRecords(), fullSerializedData, splitSizes); - - std::vector<TChunkedArraySerialized> result; - ui32 idx = 0; - ui32 startIdx = 0; - for (auto&& i : chunks) { - AFL_VERIFY(i.GetSlicedBatch()->num_columns() == 2); - AFL_VERIFY(i.GetSlicedBatch()->column(0)->type()->id() == arrow::uint32()->id()); - auto UI32Column = static_pointer_cast<arrow::UInt32Array>(i.GetSlicedBatch()->column(0)); - ui32 nextStartIdx = NArrow::NAccessor::TSparsedArray::GetLastIndex(i.GetSlicedBatch()) + 1; - if (idx + 1 == chunks.size()) { - nextStartIdx = GetRecordsCount(); - } - std::shared_ptr<arrow::RecordBatch> batch; - { - std::unique_ptr<arrow::ArrayBuilder> builder = NArrow::MakeBuilder(arrow::uint32()); - arrow::UInt32Builder* builderImpl = (arrow::UInt32Builder*)builder.get(); - for (ui32 rowIdx = 0; rowIdx < UI32Column->length(); ++rowIdx) { - TStatusValidator::Validate(builderImpl->Append(UI32Column->Value(rowIdx) - startIdx)); - } - auto colIndex = TStatusValidator::GetValid(builder->Finish()); - batch = arrow::RecordBatch::Make( - i.GetSlicedBatch()->schema(), i.GetSlicedBatch()->num_rows(), { colIndex, i.GetSlicedBatch()->column(1) }); - } - - ++idx; - { - TBuilder builder(DefaultValue, GetDataType()); - builder.AddChunk(nextStartIdx - startIdx, batch); - result.emplace_back(builder.Finish(), saver.Apply(batch)); - } - startIdx = nextStartIdx; - } - - return result; -} - std::shared_ptr<arrow::Scalar> TSparsedArray::DoGetMaxScalar() const { std::shared_ptr<arrow::Scalar> result; for (auto&& i : Records) { @@ -137,7 +98,7 @@ namespace { static thread_local THashMap<TString, std::shared_ptr<arrow::RecordBatch>> SimpleBatchesCache; } -NKikimr::NArrow::NAccessor::TSparsedArrayChunk TSparsedArray::MakeDefaultChunk( +TSparsedArrayChunk TSparsedArray::MakeDefaultChunk( const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount) { auto it = SimpleBatchesCache.find(type->ToString()); if (it == SimpleBatchesCache.end()) { @@ -159,21 +120,8 @@ IChunkedArray::TLocalDataAddress TSparsedArrayChunk::GetChunk( return IChunkedArray::TLocalDataAddress( NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, it->GetSize()), StartPosition + it->GetStartExt(), chunkIdx); } else { - return IChunkedArray::TLocalDataAddress( - ColValue->Slice(it->GetStartInt(), it->GetSize()), StartPosition + it->GetStartExt(), chunkIdx); - } -} - -std::vector<std::shared_ptr<arrow::Array>> TSparsedArrayChunk::GetChunkedArray() const { - std::vector<std::shared_ptr<arrow::Array>> chunks; - for (auto&& i : RemapExternalToInternal) { - if (i.GetIsDefault()) { - chunks.emplace_back(NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, i.GetSize())); - } else { - chunks.emplace_back(ColValue->Slice(i.GetStartInt(), i.GetSize())); - } + return IChunkedArray::TLocalDataAddress(ColValue->Slice(it->GetStartInt(), it->GetSize()), StartPosition + it->GetStartExt(), chunkIdx); } - return chunks; } TSparsedArrayChunk::TSparsedArrayChunk(const ui32 posStart, const ui32 recordsCount, const std::shared_ptr<arrow::RecordBatch>& records, diff --git a/ydb/core/formats/arrow/accessor/sparsed/accessor.h b/ydb/core/formats/arrow/accessor/sparsed/accessor.h index 07a1a2465c..ee6fbb04b2 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/accessor.h +++ b/ydb/core/formats/arrow/accessor/sparsed/accessor.h @@ -3,6 +3,7 @@ #include <ydb/library/accessor/accessor.h> #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/size_calcer.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> @@ -68,31 +69,86 @@ public: IChunkedArray::TLocalDataAddress GetChunk( const std::optional<IChunkedArray::TCommonChunkAddress>& chunkCurrent, const ui64 position, const ui32 chunkIdx) const; - std::vector<std::shared_ptr<arrow::Array>> GetChunkedArray() const; - TSparsedArrayChunk(const ui32 posStart, const ui32 recordsCount, const std::shared_ptr<arrow::RecordBatch>& records, const std::shared_ptr<arrow::Scalar>& defaultValue); ui64 GetRawSize() const; + + ui32 GetNullsCount() const { + if (!!DefaultValue) { + return ColValue->null_count(); + } else { + AFL_VERIFY(GetNotDefaultRecordsCount() <= GetRecordsCount()); + return GetRecordsCount() - GetNotDefaultRecordsCount(); + } + } + ui32 GetValueRawBytes() const { + return NArrow::GetArrayDataSize(ColValue); + } + + TSparsedArrayChunk Slice(const ui32 newStart, const ui32 offset, const ui32 count) const { + AFL_VERIFY(offset + count <= RecordsCount)("offset", offset)("count", count)("records", RecordsCount); + std::optional<ui32> startPosition = NArrow::FindUpperOrEqualPosition(*UI32ColIndex, offset); + std::optional<ui32> finishPosition = NArrow::FindUpperOrEqualPosition(*UI32ColIndex, offset + count); + if (!startPosition || startPosition == finishPosition) { + return TSparsedArrayChunk(newStart, count, NArrow::MakeEmptyBatch(Records->schema(), 0), DefaultValue); + } else { + AFL_VERIFY(startPosition); + auto builder = NArrow::MakeBuilder(arrow::uint32()); + for (ui32 i = *startPosition; i < finishPosition.value_or(Records->num_rows()); ++i) { + NArrow::Append<arrow::UInt32Type>(*builder, UI32ColIndex->Value(i) - offset); + } + auto arrIndexes = NArrow::FinishBuilder(std::move(builder)); + auto arrValue = ColValue->Slice(*startPosition, finishPosition.value_or(Records->num_rows()) - *startPosition); + auto sliceRecords = arrow::RecordBatch::Make(Records->schema(), arrValue->length(), { arrIndexes, arrValue }); + return TSparsedArrayChunk(newStart, count, sliceRecords, DefaultValue); + } + } }; class TSparsedArray: public IChunkedArray { private: using TBase = IChunkedArray; - std::shared_ptr<arrow::Scalar> DefaultValue; + YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue); std::vector<TSparsedArrayChunk> Records; protected: - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( - const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override { - AFL_VERIFY(false); - return TLocalChunkedArrayAddress(nullptr, 0, 0); - } - virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override; - virtual std::vector<TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) override; + virtual ui32 DoGetNullsCount() const override { + ui32 result = 0; + for (auto&& i : Records) { + result += i.GetNullsCount(); + } + return result; + } + virtual ui32 DoGetValueRawBytes() const override { + ui32 result = 0; + for (auto&& i : Records) { + result += i.GetValueRawBytes(); + } + return result; + } + + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override { + TBuilder builder(DefaultValue, GetDataType()); + ui32 newStart = 0; + for (ui32 i = 0; i < Records.size(); ++i) { + if (Records[i].GetStartPosition() + Records[i].GetRecordsCount() <= offset) { + continue; + } + if (offset + count <= Records[i].GetStartPosition()) { + continue; + } + const ui32 chunkStart = (offset < Records[i].GetStartPosition()) ? 0 : (offset - Records[i].GetStartPosition()); + const ui32 chunkCount = (offset + count <= Records[i].GetFinishPosition()) + ? (offset + count - Records[i].GetStartPosition() - chunkStart) + : (Records[i].GetFinishPosition() - chunkStart); + builder.AddChunk(Records[i].Slice(newStart, chunkStart, chunkCount)); + newStart += chunkCount; + } + return builder.Finish(); + } virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override { ui32 currentIdx = 0; @@ -105,14 +161,6 @@ protected: AFL_VERIFY(false); return TLocalDataAddress(nullptr, 0, 0); } - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override { - std::vector<std::shared_ptr<arrow::Array>> chunks; - for (auto&& i : Records) { - auto chunksLocal = i.GetChunkedArray(); - chunks.insert(chunks.end(), chunksLocal.begin(), chunksLocal.end()); - } - return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType()); - } virtual std::optional<ui64> DoGetRawSize() const override { ui64 bytes = 0; for (auto&& i : Records) { @@ -170,6 +218,42 @@ public: return *it; } + template <class TDataType> + class TSparsedBuilder { + private: + std::unique_ptr<arrow::ArrayBuilder> IndexBuilder; + std::unique_ptr<arrow::ArrayBuilder> ValueBuilder; + ui32 RecordsCount = 0; + const std::shared_ptr<arrow::Scalar> DefaultValue; + + public: + TSparsedBuilder(const std::shared_ptr<arrow::Scalar>& defaultValue, const ui32 reserveItems, const ui32 reserveData) + : DefaultValue(defaultValue) { + IndexBuilder = NArrow::MakeBuilder(arrow::uint32(), reserveItems, 0); + ValueBuilder = NArrow::MakeBuilder(arrow::TypeTraits<TDataType>::type_singleton(), reserveItems, reserveData); + } + + void AddRecord(const ui32 recordIndex, const std::string_view value) { + AFL_VERIFY(NArrow::Append<arrow::UInt32Type>(*IndexBuilder, recordIndex)); + AFL_VERIFY(NArrow::Append<TDataType>(*ValueBuilder, arrow::util::string_view(value.data(), value.size()))); + ++RecordsCount; + } + + std::shared_ptr<IChunkedArray> Finish(const ui32 recordsCount) { + TSparsedArray::TBuilder builder(DefaultValue, arrow::TypeTraits<TDataType>::type_singleton()); + std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders; + builders.emplace_back(std::move(IndexBuilder)); + builders.emplace_back(std::move(ValueBuilder)); + builder.AddChunk(recordsCount, arrow::RecordBatch::Make(TSparsedArray::BuildSchema(arrow::TypeTraits<TDataType>::type_singleton()), + RecordsCount, NArrow::Finish(std::move(builders)))); + return builder.Finish(); + } + }; + + static TSparsedBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveData = 0) { + return TSparsedBuilder<arrow::StringType>(nullptr, reserveItems, reserveData); + } + class TBuilder { private: ui32 RecordsCount = 0; @@ -184,6 +268,10 @@ public: } void AddChunk(const ui32 recordsCount, const std::shared_ptr<arrow::RecordBatch>& data); + void AddChunk(TSparsedArrayChunk&& chunk) { + RecordsCount += chunk.GetRecordsCount(); + Chunks.emplace_back(std::move(chunk)); + } std::shared_ptr<TSparsedArray> Finish() { return std::shared_ptr<TSparsedArray>(new TSparsedArray(std::move(Chunks), DefaultValue, Type, RecordsCount)); diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp index 2962636c36..7a76ce60f2 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp @@ -1,23 +1,27 @@ #include "accessor.h" #include "constructor.h" -namespace NKikimr::NArrow::NAccessor::NSparsed { +#include <ydb/core/formats/arrow/serializer/abstract.h> -std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const { - arrow::FieldVector fields = { std::make_shared<arrow::Field>("index", arrow::uint32()), - std::make_shared<arrow::Field>("value", resultColumn->type()) }; - return std::make_shared<arrow::Schema>(fields); -} +namespace NKikimr::NArrow::NAccessor::NSparsed { TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const { return std::make_shared<TSparsedArray>(externalInfo.GetDefaultValue(), externalInfo.GetColumnType(), externalInfo.GetRecordsCount()); } -TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const { - AFL_VERIFY(originalData->num_columns() == 2)("count", originalData->num_columns())("schema", originalData->schema()->ToString()); +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const { + arrow::FieldVector fields = { std::make_shared<arrow::Field>("index", arrow::uint32()), + std::make_shared<arrow::Field>("value", externalInfo.GetColumnType()) }; + auto schema = std::make_shared<arrow::Schema>(fields); + auto rbParsed = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema); + if (!rbParsed.ok()) { + return TConclusionStatus::Fail(rbParsed.status().ToString()); + } + auto rb = *rbParsed; + AFL_VERIFY(rb->num_columns() == 2)("count", rb->num_columns())("schema", rb->schema()->ToString()); NArrow::NAccessor::TSparsedArray::TBuilder builder(externalInfo.GetDefaultValue(), externalInfo.GetColumnType()); - builder.AddChunk(externalInfo.GetRecordsCount(), originalData); + builder.AddChunk(externalInfo.GetRecordsCount(), rb); return builder.Finish(); } @@ -31,10 +35,15 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons return true; } -std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct( - const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { - NArrow::NAccessor::TSparsedArray sparsed(*columnData, externalInfo.GetDefaultValue()); - return sparsed.GetRecordBatchVerified(); +TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + std::shared_ptr<TSparsedArray> sparsed = std::static_pointer_cast<TSparsedArray>(columnData); + return externalInfo.GetDefaultSerializer()->SerializePayload(sparsed->GetRecordBatchVerified()); +} + +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( + const std::shared_ptr<IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(originalArray); + return std::make_shared<TSparsedArray>(*originalArray, externalInfo.GetDefaultValue()); } } // namespace NKikimr::NArrow::NAccessor::NSparsed diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.h b/ydb/core/formats/arrow/accessor/sparsed/constructor.h index a85b8918ea..6591b31c76 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/constructor.h +++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.h @@ -1,10 +1,13 @@ #pragma once #include <ydb/core/formats/arrow/accessor/abstract/constructor.h> + #include <ydb/library/formats/arrow/accessor/common/const.h> namespace NKikimr::NArrow::NAccessor::NSparsed { class TConstructor: public IConstructor { +private: + using TBase = IConstructor; public: static TString GetClassNameStatic() { return TGlobalConst::SparsedDataAccessorName; @@ -12,24 +15,30 @@ public: private: static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic()); + std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const; + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstruct( + const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const override; virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override { return true; } - virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( + virtual TString DoSerializeToString( const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override; - virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct( - const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override; + virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const override; virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override; virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) override; - virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const override; virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstructDefault(const TChunkConstructionData& externalInfo) const override; public: virtual TString GetClassName() const override { return GetClassNameStatic(); } + + TConstructor() + : TBase(IChunkedArray::EType::SparsedArray) { + } }; } // namespace NKikimr::NArrow::NAccessor::NSparsed diff --git a/ydb/core/formats/arrow/accessor/sparsed/ut/ut_sparsed.cpp b/ydb/core/formats/arrow/accessor/sparsed/ut/ut_sparsed.cpp new file mode 100644 index 0000000000..781424597a --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sparsed/ut/ut_sparsed.cpp @@ -0,0 +1,84 @@ +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <regex> + +Y_UNIT_TEST_SUITE(SparsedArrayAccessor) { + using namespace NKikimr::NArrow::NAccessor; + using namespace NKikimr::NArrow; + + std::string PrepareToCompare(const std::string& str) { + return std::regex_replace(str, std::regex(" |\\n"), ""); + } + + Y_UNIT_TEST(SlicesDef) { + TSparsedArray::TSparsedBuilder<arrow::StringType> builder(std::make_shared<arrow::StringScalar>("aaa"), 10, 0); + builder.AddRecord(5, "abc5"); + builder.AddRecord(6, "abcd6"); + builder.AddRecord(8, "abcde8"); + auto arr = builder.Finish(10); + { + const TString arrString = PrepareToCompare(arr->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[\"aaa\",\"aaa\",\"aaa\",\"aaa\",\"aaa\"],[\"abc5\",\"abcd6\"],[\"aaa\"],[\"abcde8\"],[\"aaa\"]]")( + "string", arrString); + AFL_VERIFY(arr->GetScalar(3)->ToString() == "aaa"); + } + { + auto arrSlice = arr->ISlice(0, 4); + AFL_VERIFY(arrSlice->GetRecordsCount() == 4); + AFL_VERIFY(arrSlice->GetScalar(3)->ToString() == "aaa"); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[\"aaa\",\"aaa\",\"aaa\",\"aaa\"]]")("string", arrString); + } + { + auto arrSlice = arr->ISlice(2, 4); + AFL_VERIFY(arrSlice->GetRecordsCount() == 4); + AFL_VERIFY(arrSlice->GetScalar(3)->ToString() == "abc5"); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[\"aaa\",\"aaa\",\"aaa\"],[\"abc5\"]]")("string", arrString); + } + { + auto arrSlice = arr->ISlice(6, 3); + AFL_VERIFY(arrSlice->GetRecordsCount() == 3); + AFL_VERIFY(arrSlice->GetScalar(0)->ToString() == "abcd6"); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[\"abcd6\"],[\"aaa\"],[\"abcde8\"]]")("string", arrString); + } + } + + Y_UNIT_TEST(SlicesNull) { + TSparsedArray::TSparsedBuilder<arrow::StringType> builder(nullptr, 10, 0); + builder.AddRecord(5, "abc5"); + builder.AddRecord(6, "abcd6"); + builder.AddRecord(8, "abcde8"); + auto arr = builder.Finish(10); + { + const TString arrString = PrepareToCompare(arr->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[null,null,null,null,null],[\"abc5\",\"abcd6\"],[null],[\"abcde8\"],[null]]")("string", arrString); + AFL_VERIFY(!arr->GetScalar(3)); + } + { + auto arrSlice = arr->ISlice(0, 4); + AFL_VERIFY(arrSlice->GetRecordsCount() == 4); + AFL_VERIFY(!arrSlice->GetScalar(3)); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[null,null,null,null]]")("string", arrString); + } + { + auto arrSlice = arr->ISlice(2, 4); + AFL_VERIFY(arrSlice->GetRecordsCount() == 4); + AFL_VERIFY(arrSlice->GetScalar(3)->ToString() == "abc5"); + AFL_VERIFY(!arrSlice->GetScalar(0)); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[null,null,null],[\"abc5\"]]")("string", arrString); + } + { + auto arrSlice = arr->ISlice(6, 3); + AFL_VERIFY(arrSlice->GetRecordsCount() == 3); + AFL_VERIFY(arrSlice->GetScalar(0)->ToString() == "abcd6"); + const TString arrString = PrepareToCompare(arrSlice->GetChunkedArray()->ToString()); + AFL_VERIFY(arrString == "[[\"abcd6\"],[null],[\"abcde8\"]]")("string", arrString); + } + } +}; diff --git a/ydb/core/formats/arrow/accessor/sparsed/ut/ya.make b/ydb/core/formats/arrow/accessor/sparsed/ut/ya.make new file mode 100644 index 0000000000..b8e0ee50de --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sparsed/ut/ya.make @@ -0,0 +1,14 @@ +UNITTEST_FOR(ydb/core/formats/arrow/accessor/sparsed) + +SIZE(SMALL) + +PEERDIR( +) + +YQL_LAST_ABI_VERSION() + +SRCS( + ut_sparsed.cpp +) + +END() diff --git a/ydb/core/formats/arrow/accessor/sparsed/ya.make b/ydb/core/formats/arrow/accessor/sparsed/ya.make index c68f5f84f9..62eb54f657 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/ya.make +++ b/ydb/core/formats/arrow/accessor/sparsed/ya.make @@ -13,3 +13,7 @@ SRCS( ) END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp new file mode 100644 index 0000000000..759b161809 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp @@ -0,0 +1,143 @@ +#include "accessor.h" + +#include <ydb/core/formats/arrow/accessor/composite_serial/accessor.h> +#include <ydb/core/formats/arrow/accessor/plain/constructor.h> +#include <ydb/core/formats/arrow/save_load/loader.h> +#include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/formats/arrow/splitter/simple.h> + +#include <ydb/library/formats/arrow/protos/accessor.pb.h> +#include <ydb/library/formats/arrow/simple_arrays_cache.h> + +#include <yql/essentials/types/binary_json/format.h> +#include <yql/essentials/types/binary_json/write.h> + +namespace NKikimr::NArrow::NAccessor { + +TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(const std::shared_ptr<IChunkedArray>& sourceArray, + const std::shared_ptr<NSubColumns::IDataAdapter>& adapter, const NSubColumns::TSettings& settings) { + AFL_VERIFY(adapter); + AFL_VERIFY(sourceArray); + NSubColumns::TDataBuilder builder(sourceArray->GetDataType(), settings); + IChunkedArray::TReader reader(sourceArray); + std::vector<std::shared_ptr<arrow::Array>> storage; + for (ui32 i = 0; i < reader.GetRecordsCount();) { + auto address = reader.GetReadChunk(i); + storage.emplace_back(address.GetArray()); + adapter->AddDataToBuilders(address.GetArray(), builder); + i += address.GetArray()->length(); + AFL_VERIFY(i <= reader.GetRecordsCount()); + } + return builder.Finish(); +} + +TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings) + : TBase(recordsCount, EType::SubColumnsArray, type) + , ColumnsData(NSubColumns::TColumnsData::BuildEmpty(recordsCount)) + , OthersData(NSubColumns::TOthersData::BuildEmpty()) + , Settings(settings) { +} + +TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, + const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings) + : TBase(recordsCount, EType::SubColumnsArray, type) + , ColumnsData(std::move(columns)) + , OthersData(std::move(others)) + , Settings(settings) { +} + +TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const { + TString blobData; + NKikimrArrowAccessorProto::TSubColumnsAccessor proto; + std::vector<TString> blobRanges; + if (ColumnsData.GetStats().GetColumnsCount()) { + blobRanges.emplace_back(ColumnsData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer())); + proto.SetColumnStatsSize(blobRanges.back().size()); + } else { + proto.SetColumnStatsSize(0); + } + + if (OthersData.GetStats().GetColumnsCount()) { + blobRanges.emplace_back(OthersData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer())); + proto.SetOtherStatsSize(blobRanges.back().size()); + } else { + proto.SetOtherStatsSize(0); + } + ui32 columnIdx = 0; + for (auto&& i : ColumnsData.GetRecords()->GetColumns()) { + TChunkConstructionData cData(GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer()); + blobRanges.emplace_back( + ColumnsData.GetStats().GetAccessorConstructor(columnIdx).SerializeToString(i, cData)); + auto* cInfo = proto.AddKeyColumns(); + cInfo->SetSize(blobRanges.back().size()); + ++columnIdx; + } + + if (OthersData.GetRecords()->GetRecordsCount()) { + for (auto&& i : OthersData.GetRecords()->GetColumns()) { + TChunkConstructionData cData(i->GetRecordsCount(), nullptr, i->GetDataType(), externalInfo.GetDefaultSerializer()); + blobRanges.emplace_back(NPlain::TConstructor().SerializeToString(i, cData)); + auto* cInfo = proto.AddOtherColumns(); + cInfo->SetSize(blobRanges.back().size()); + } + } + proto.SetOtherRecordsCount(OthersData.GetRecords()->GetRecordsCount()); + + ui64 blobsSize = 0; + for (auto&& i : blobRanges) { + blobsSize += i.size(); + } + + const TString protoString = proto.SerializeAsString(); + TString result; + TStringOutput so(result); + so.Reserve(protoString.size() + sizeof(ui32) + blobsSize); + const ui32 protoSize = protoString.size(); + so.Write(&protoSize, sizeof(protoSize)); + so.Write(protoString.data(), protoSize); + for (auto&& s : blobRanges) { + so.Write(s.data(), s.size()); + } + so.Finish(); + return result; +} + +IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData( + const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const { + auto it = BuildUnorderedIterator(); + auto builder = NArrow::MakeBuilder(GetDataType()); + for (ui32 recordIndex = 0; recordIndex < GetRecordsCount(); ++recordIndex) { + NJson::TJsonValue value; + auto onStartRecord = [&](const ui32 index) { + AFL_VERIFY(recordIndex == index)("count", recordIndex)("index", index); + }; + auto onFinishRecord = [&]() { + auto str = value.GetStringRobust(); + // NArrow::Append<arrow::BinaryType>(*builder, arrow::util::string_view(str.data(), str.size())); + // + auto bJson = NBinaryJson::SerializeToBinaryJson(value.GetStringRobust()); + if (const TString* val = std::get_if<TString>(&bJson)) { + AFL_VERIFY(false)("error", *val); + } else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) { + if (value.IsNull() || !value.IsDefined()) { + TStatusValidator::Validate(builder->AppendNull()); + } else { + NArrow::Append<arrow::BinaryType>(*builder, arrow::util::string_view(val->data(), val->size())); + } + } else { + AFL_VERIFY(false); + } + }; + auto onRecordKV = [&](const ui32 index, const std::string_view valueView, const bool isColumn) { + if (isColumn) { + value.InsertValue(ColumnsData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size())); + } else { + value.InsertValue(OthersData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size())); + } + }; + it.ReadRecord(recordIndex, onStartRecord, onRecordKV, onFinishRecord); + } + return TLocalDataAddress(NArrow::FinishBuilder(std::move(builder)), 0, 0); +} + +} // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h new file mode 100644 index 0000000000..1b4cb7fec9 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h @@ -0,0 +1,89 @@ +#pragma once +#include "columns_storage.h" +#include "data_extractor.h" +#include "iterators.h" +#include "others_storage.h" +#include "settings.h" + +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/common/container.h> + +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/common/chunk_data.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h> + +namespace NKikimr::NArrow::NAccessor { + +class TSubColumnsArray: public IChunkedArray { +private: + using TBase = IChunkedArray; + NSubColumns::TColumnsData ColumnsData; + NSubColumns::TOthersData OthersData; + const NSubColumns::TSettings Settings; + +protected: + virtual NJson::TJsonValue DoDebugJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("columns_data", ColumnsData.DebugJson()); + result.InsertValue("others_data", OthersData.DebugJson()); + result.InsertValue("settings", Settings.DebugJson()); + return result; + } + virtual ui32 DoGetNullsCount() const override { + AFL_VERIFY(false); + return 0; + } + virtual ui32 DoGetValueRawBytes() const override { + AFL_VERIFY(false); + return 0; + } + + virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override { + return nullptr; + } + + virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; + virtual std::optional<ui64> DoGetRawSize() const override { + return ColumnsData.GetRawSize() + OthersData.GetRawSize(); + } + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override { + return std::make_shared<TSubColumnsArray>( + ColumnsData.Slice(offset, count), OthersData.Slice(offset, count, Settings), GetDataType(), count, Settings); + } + +public: + std::shared_ptr<NSubColumns::TReadIteratorOrderedKeys> BuildOrderedIterator() const { + return std::make_shared<NSubColumns::TReadIteratorOrderedKeys>(ColumnsData, OthersData); + } + + NSubColumns::TReadIteratorUnorderedKeys BuildUnorderedIterator() const { + return NSubColumns::TReadIteratorUnorderedKeys(ColumnsData, OthersData); + } + + const NSubColumns::TColumnsData& GetColumnsData() const { + return ColumnsData; + } + const NSubColumns::TOthersData& GetOthersData() const { + return OthersData; + } + + TString SerializeToString(const TChunkConstructionData& externalInfo) const; + + TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, const std::shared_ptr<arrow::DataType>& type, + const ui32 recordsCount, const NSubColumns::TSettings& settings); + + static TConclusion<std::shared_ptr<TSubColumnsArray>> Make(const std::shared_ptr<IChunkedArray>& sourceArray, + const std::shared_ptr<NSubColumns::IDataAdapter>& adapter, const NSubColumns::TSettings& settings); + + TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings); + + virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override { + return nullptr; + } +}; + +} // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp new file mode 100644 index 0000000000..4a98bbfb7b --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp @@ -0,0 +1,21 @@ +#include "columns_storage.h" + +namespace NKikimr::NArrow::NAccessor::NSubColumns { +TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const { + auto sliceRecords = Records->Slice(offset, count); + if (sliceRecords.GetRecordsCount()) { + TDictStats::TBuilder builder; + ui32 idx = 0; + for (auto&& i : sliceRecords.GetColumns()) { + AFL_VERIFY(Stats.GetColumnName(idx) == sliceRecords.GetSchema()->field(idx)->name()); + builder.Add(Stats.GetColumnName(idx), i->GetRecordsCount() - i->GetNullsCount(), i->GetValueRawBytes(), i->GetType()); + ++idx; + } + return TColumnsData(builder.Finish(), std::make_shared<TGeneralContainer>(std::move(sliceRecords))); + + } else { + return TColumnsData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(0)); + } +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h new file mode 100644 index 0000000000..b8eeb35e3f --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h @@ -0,0 +1,140 @@ +#pragma once + +#include "stats.h" + +#include <ydb/core/formats/arrow/common/container.h> + +#include <ydb/library/accessor/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TColumnsData { +private: + TDictStats Stats; + YDB_READONLY_DEF(std::shared_ptr<TGeneralContainer>, Records); + +public: + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("stats", Stats.DebugJson()); + result.InsertValue("records", Records->DebugJson(true)); + return result; + } + + TColumnsData Slice(const ui32 offset, const ui32 count) const; + + static TColumnsData BuildEmpty(const ui32 recordsCount) { + return TColumnsData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(recordsCount)); + } + + ui64 GetRawSize() const { + return Records->GetRawSizeVerified(); + } + + class TIterator { + private: + ui32 KeyIndex; + std::shared_ptr<IChunkedArray> GlobalChunkedArray; + std::shared_ptr<arrow::StringArray> CurrentArrayData; + std::optional<IChunkedArray::TFullChunkedArrayAddress> FullArrayAddress; + std::optional<IChunkedArray::TFullDataAddress> ChunkAddress; + ui32 CurrentIndex = 0; + + void InitArrays() { + while (CurrentIndex < GlobalChunkedArray->GetRecordsCount()) { + if (!FullArrayAddress || !FullArrayAddress->GetAddress().Contains(CurrentIndex)) { + FullArrayAddress = GlobalChunkedArray->GetArray(FullArrayAddress, CurrentIndex, GlobalChunkedArray); + ChunkAddress = std::nullopt; + } + const ui32 localIndex = FullArrayAddress->GetAddress().GetLocalIndex(CurrentIndex); + ChunkAddress = FullArrayAddress->GetArray()->GetChunk(ChunkAddress, localIndex); + AFL_VERIFY(ChunkAddress->GetArray()->type()->id() == arrow::utf8()->id()); + CurrentArrayData = std::static_pointer_cast<arrow::StringArray>(ChunkAddress->GetArray()); + if (FullArrayAddress->GetArray()->GetType() == IChunkedArray::EType::Array) { + if (CurrentArrayData->IsNull(localIndex)) { + Next(); + } + break; + } else if (FullArrayAddress->GetArray()->GetType() == IChunkedArray::EType::SparsedArray) { + if (CurrentArrayData->IsNull(localIndex) && + std::static_pointer_cast<TSparsedArray>(FullArrayAddress->GetArray())->GetDefaultValue() == nullptr) { + CurrentIndex += ChunkAddress->GetArray()->length(); + } else { + break; + } + } else { + AFL_VERIFY(false)("type", FullArrayAddress->GetArray()->GetType()); + } + } + AFL_VERIFY(CurrentIndex <= GlobalChunkedArray->GetRecordsCount()); + } + + public: + TIterator(const ui32 keyIndex, const std::shared_ptr<IChunkedArray>& chunkedArray) + : KeyIndex(keyIndex) + , GlobalChunkedArray(chunkedArray) { + InitArrays(); + } + + ui32 GetCurrentRecordIndex() const { + return CurrentIndex; + } + + ui32 GetKeyIndex() const { + return KeyIndex; + } + + std::string_view GetValue() const { + auto view = CurrentArrayData->GetView(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex)); + return std::string_view(view.data(), view.size()); + } + + bool HasValue() const { + return !CurrentArrayData->IsNull(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex)); + } + + bool IsValid() const { + return CurrentIndex < GlobalChunkedArray->GetRecordsCount(); + } + + bool Next() { + AFL_VERIFY(IsValid()); + while (true) { + ++CurrentIndex; + if (ChunkAddress->GetAddress().Contains(CurrentIndex)) { + if (CurrentArrayData->IsNull(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex))) { + continue; + } + return true; + } else if (CurrentIndex == GlobalChunkedArray->GetRecordsCount()) { + return false; + } else { + InitArrays(); + return IsValid(); + } + } + } + }; + + TIterator BuildIterator(const ui32 keyIndex) const { + return TIterator(keyIndex, Records->GetColumnVerified(keyIndex)); + } + + const TDictStats& GetStats() const { + return Stats; + } + + TColumnsData(const TDictStats& dict, const std::shared_ptr<TGeneralContainer>& data) + : Stats(dict) + , Records(data) { + AFL_VERIFY(Records->num_columns() == Stats.GetColumnsCount()); + for (auto&& i : Records->GetColumns()) { + AFL_VERIFY(i->GetDataType()->id() == arrow::utf8()->id()); + } + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp new file mode 100644 index 0000000000..f8a27fce34 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp @@ -0,0 +1,106 @@ +#include "accessor.h" +#include "constructor.h" + +#include <ydb/core/formats/arrow/accessor/composite_serial/accessor.h> +#include <ydb/core/formats/arrow/accessor/plain/constructor.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(externalInfo.GetDefaultValue() == nullptr); + return std::make_shared<TSubColumnsArray>(externalInfo.GetColumnType(), externalInfo.GetRecordsCount(), Settings); +} + +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const { + TStringInput si(originalData); + ui32 protoSize; + si.Read(&protoSize, sizeof(protoSize)); + ui64 currentIndex = sizeof(protoSize); + NKikimrArrowAccessorProto::TSubColumnsAccessor proto; + if (!proto.ParseFromArray(originalData.data() + currentIndex, protoSize)) { + return TConclusionStatus::Fail("cannot parse proto"); + } + currentIndex += protoSize; + TDictStats columnStats = [&]() { + if (proto.GetColumnStatsSize()) { + std::shared_ptr<arrow::RecordBatch> rbColumnStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize( + TString(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetStatsSchema())); + return TDictStats(rbColumnStats); + } else { + return TDictStats::BuildEmpty(); + } + }(); + currentIndex += proto.GetColumnStatsSize(); + TDictStats otherStats = [&]() { + if (proto.GetOtherStatsSize()) { + std::shared_ptr<arrow::RecordBatch> rbOtherStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize( + TString(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetStatsSchema())); + return TDictStats(rbOtherStats); + } else { + return TDictStats::BuildEmpty(); + } + }(); + currentIndex += proto.GetOtherStatsSize(); + + std::shared_ptr<TGeneralContainer> columnKeysContainer; + { + std::vector<std::shared_ptr<IChunkedArray>> columns; + auto schema = columnStats.BuildColumnsSchema(); + AFL_VERIFY(columnStats.GetColumnsCount() == (ui32)proto.GetKeyColumns().size())("schema", columnStats.GetColumnsCount())( + "proto", proto.GetKeyColumns().size()); + for (ui32 i = 0; i < (ui32)proto.GetKeyColumns().size(); ++i) { + std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>( + externalInfo.GetDefaultSerializer(), columnStats.GetAccessorConstructor(i), schema->field(i), nullptr, 0); + std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk( + externalInfo.GetRecordsCount(), originalData.substr(currentIndex, proto.GetKeyColumns(i).GetSize())) }; + columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(externalInfo.GetRecordsCount(), columnLoader, std::move(chunks))); + currentIndex += proto.GetKeyColumns(i).GetSize(); + } + columnKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move(columns)); + } + TOthersData otherData = TOthersData::BuildEmpty(); + if (proto.GetOtherColumns().size() && proto.GetOtherRecordsCount()) { + std::shared_ptr<TGeneralContainer> otherKeysContainer; + std::vector<std::shared_ptr<IChunkedArray>> columns; + AFL_VERIFY(TOthersData::GetSchema()->num_fields() == proto.GetOtherColumns().size())("proto", proto.GetOtherColumns().size())( + "schema", TOthersData::GetSchema()->num_fields()); + auto schema = TOthersData::GetSchema(); + for (ui32 i = 0; i < (ui32)proto.GetOtherColumns().size(); ++i) { + std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>( + externalInfo.GetDefaultSerializer(), std::make_shared<NPlain::TConstructor>(), schema->field(i), nullptr, 0); + std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk( + proto.GetOtherRecordsCount(), originalData.substr(currentIndex, proto.GetOtherColumns(i).GetSize())) }; + columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount(), columnLoader, std::move(chunks))); + currentIndex += proto.GetOtherColumns(i).GetSize(); + } + otherKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move(columns)); + otherData = TOthersData(otherStats, otherKeysContainer); + } + TColumnsData columnData(columnStats, columnKeysContainer); + return std::make_shared<TSubColumnsArray>( + std::move(columnData), std::move(otherData), externalInfo.GetColumnType(), externalInfo.GetRecordsCount(), Settings); +} + +NKikimrArrowAccessorProto::TConstructor TConstructor::DoSerializeToProto() const { + NKikimrArrowAccessorProto::TConstructor result; + *result.MutableSubColumns()->MutableSettings() = Settings.SerializeToProto(); + return result; +} + +bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) { + return Settings.DeserializeFromProto(proto.GetSubColumns().GetSettings()); +} + +TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( + const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& /*externalInfo*/) const { + return NAccessor::TSubColumnsArray::Make(originalData, DataExtractor, Settings).DetachResult(); +} + +TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + const std::shared_ptr<TSubColumnsArray> arr = std::static_pointer_cast<TSubColumnsArray>(columnData); + return arr->SerializeToString(externalInfo); +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/constructor.h b/ydb/core/formats/arrow/accessor/sub_columns/constructor.h new file mode 100644 index 0000000000..1f2c1d20b4 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/constructor.h @@ -0,0 +1,56 @@ +#pragma once +#include "data_extractor.h" + +#include <ydb/core/formats/arrow/accessor/abstract/constructor.h> + +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TConstructor: public IConstructor { +private: + using TBase = IConstructor; + std::shared_ptr<IDataAdapter> DataExtractor = std::make_shared<TFirstLevelSchemaData>(); + TSettings Settings; + +public: + static TString GetClassNameStatic() { + return TGlobalConst::SubColumnsDataAccessorName; + } + const TSettings& GetSettings() const { + return Settings; + } + +private: + static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic()); + + virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override { + return true; + } + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override; + + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoDeserializeFromString( + const TString& originalData, const TChunkConstructionData& externalInfo) const override; + virtual TString DoSerializeToString( + const std::shared_ptr<IChunkedArray>& chunkedArray, const TChunkConstructionData& externalInfo) const override; + virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override; + virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) override; + virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstructDefault(const TChunkConstructionData& externalInfo) const override; + +public: + TConstructor() + : TBase(IChunkedArray::EType::SubColumnsArray) { + } + + TConstructor(const TSettings& settings) + : TBase(IChunkedArray::EType::SubColumnsArray) + , Settings(settings) { + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp new file mode 100644 index 0000000000..3a1831076a --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp @@ -0,0 +1,59 @@ +#include "data_extractor.h" + +#include <util/string/split.h> +#include <util/string/vector.h> +#include <yql/essentials/types/binary_json/format.h> +#include <yql/essentials/types/binary_json/read.h> +#include <yql/essentials/types/binary_json/write.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +TConclusionStatus TFirstLevelSchemaData::DoAddDataToBuilders( + const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const noexcept { + if (sourceArray->type()->id() != arrow::binary()->id()) { + return TConclusionStatus::Fail("incorrect base type for subcolumns schema usage"); + } + + auto arr = std::static_pointer_cast<arrow::StringArray>(sourceArray); + for (ui32 i = 0; i < arr->length(); ++i) { + const auto view = arr->GetView(i); + if (view.size() && !arr->IsNull(i)) { + // NBinaryJson::TBinaryJson bJson(view.data(), view.size()); + // auto bJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(view.data(), view.size())); + // const NBinaryJson::TBinaryJson* bJsonParsed = std::get_if<NBinaryJson::TBinaryJson>(&bJson); + // AFL_VERIFY(bJsonParsed)("error", *std::get_if<TString>(&bJson))("json", TStringBuf(view.data(), view.size())); + // const NBinaryJson::TBinaryJson* bJsonParsed = &bJson; + auto reader = NBinaryJson::TBinaryJsonReader::Make(TStringBuf(view.data(), view.size())); + auto cursor = reader->GetRootCursor(); + if (cursor.GetType() != NBinaryJson::EContainerType::Object) { + return TConclusionStatus::Fail("incorrect json data"); + } + auto it = cursor.GetObjectIterator(); + while (it.HasNext()) { + auto [key, value] = it.Next(); + if (key.GetType() != NBinaryJson::EEntryType::String) { + continue; + } + if (value.GetType() == NBinaryJson::EEntryType::String) { + dataBuilder.AddKV(key.GetString(), value.GetString()); + } else if (value.GetType() == NBinaryJson::EEntryType::Number) { + dataBuilder.AddKVOwn(key.GetString(), ::ToString(value.GetNumber())); + } else if (value.GetType() == NBinaryJson::EEntryType::BoolFalse) { + dataBuilder.AddKVOwn(key.GetString(), "0"); + } else if (value.GetType() == NBinaryJson::EEntryType::BoolTrue) { + dataBuilder.AddKVOwn(key.GetString(), "1"); + } else { + continue; + } + } + } + dataBuilder.StartNextRecord(); + } + return TConclusionStatus::Success(); +} + +TConclusionStatus IDataAdapter::AddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const noexcept { + return DoAddDataToBuilders(sourceArray, dataBuilder); +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h new file mode 100644 index 0000000000..272414d0a4 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h @@ -0,0 +1,31 @@ +#pragma once +#include "direct_builder.h" + +#include <ydb/core/formats/arrow/arrow_helpers.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class IDataAdapter { +private: + virtual TConclusionStatus DoAddDataToBuilders( + const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const noexcept = 0; + +public: + virtual ~IDataAdapter() = default; + + TConclusionStatus AddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const noexcept; +}; + +class TFirstLevelSchemaData: public IDataAdapter { +private: + virtual TConclusionStatus DoAddDataToBuilders( + const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const noexcept override; + +public: +}; + +} // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp new file mode 100644 index 0000000000..7a220c175f --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp @@ -0,0 +1,110 @@ +#include "accessor.h" +#include "columns_storage.h" +#include "direct_builder.h" + +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +void TColumnElements::BuildSparsedAccessor(const ui32 recordsCount) { + AFL_VERIFY(!Accessor); + auto recordsBuilder = TSparsedArray::MakeBuilderUtf8(RecordIndexes.size(), DataSize); + for (ui32 idx = 0; idx < RecordIndexes.size(); ++idx) { + recordsBuilder.AddRecord(RecordIndexes[idx], Values[idx]); + } + Accessor = recordsBuilder.Finish(recordsCount); +} + +void TColumnElements::BuildPlainAccessor(const ui32 recordsCount) { + AFL_VERIFY(!Accessor); + auto builder = TTrivialArray::MakeBuilderUtf8(recordsCount, DataSize); + for (auto it = RecordIndexes.begin(); it != RecordIndexes.end(); ++it) { + builder.AddRecord(*it, Values[it - RecordIndexes.begin()]); + } + Accessor = builder.Finish(recordsCount); +} + +std::shared_ptr<TSubColumnsArray> TDataBuilder::Finish() { + std::map<ui64, std::vector<TColumnElements*>> elementsBySize; + ui64 sumSize = 0; + for (auto&& i : Elements) { + elementsBySize[i.second.GetDataSize()].emplace_back(&i.second); + sumSize += i.second.GetDataSize(); + } + ui32 columnAccessorsCount = 0; + std::vector<TColumnElements*> columnElements; + std::vector<TColumnElements*> otherElements; + ui64 columnsSize = 0; + for (auto rIt = elementsBySize.rbegin(); rIt != elementsBySize.rend(); ++rIt) { + for (auto&& i : rIt->second) { + AFL_VERIFY(sumSize >= columnsSize)("sum", sumSize)("columns", columnsSize); + if (columnAccessorsCount < Settings.GetColumnsLimit() && + (!sumSize || 1.0 * (sumSize - columnsSize) / sumSize > Settings.GetOthersAllowedFraction())) { + columnsSize += rIt->first; + columnElements.emplace_back(i); + ++columnAccessorsCount; + } else { + otherElements.emplace_back(i); + } + } + } + const auto predSortElements = [](const TColumnElements* l, const TColumnElements* r) { + return l->GetKeyName() < r->GetKeyName(); + }; + std::sort(columnElements.begin(), columnElements.end(), predSortElements); + std::sort(otherElements.begin(), otherElements.end(), predSortElements); + TDictStats columnStats = BuildStats(columnElements, Settings, CurrentRecordIndex); + { + ui32 columnIdx = 0; + for (auto&& i : columnElements) { + switch (columnStats.GetAccessorType(columnIdx)) { + case IChunkedArray::EType::Array: + i->BuildPlainAccessor(CurrentRecordIndex); + break; + case IChunkedArray::EType::SparsedArray: + i->BuildSparsedAccessor(CurrentRecordIndex); + break; + case IChunkedArray::EType::Undefined: + case IChunkedArray::EType::SerializedChunkedArray: + case IChunkedArray::EType::SubColumnsArray: + case IChunkedArray::EType::ChunkedArray: + AFL_VERIFY(false); + } + ++columnIdx; + } + } + + TOthersData rbOthers = MergeOthers(otherElements, CurrentRecordIndex); + + auto records = std::make_shared<TGeneralContainer>(CurrentRecordIndex); + for (auto&& i : columnElements) { + records->AddField(std::make_shared<arrow::Field>(std::string(i->GetKeyName()), arrow::utf8()), i->GetAccessorVerified()).Validate(); + } + TColumnsData cData(std::move(columnStats), std::move(records)); + return std::make_shared<TSubColumnsArray>(std::move(cData), std::move(rbOthers), Type, CurrentRecordIndex, Settings); +} + +TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& otherKeys, const ui32 recordsCount) const { + std::vector<THeapElements> heap; + ui32 idx = 0; + for (auto&& i : otherKeys) { + heap.emplace_back(i, idx); + AFL_VERIFY(heap.back().IsValid()); + ++idx; + } + std::make_heap(heap.begin(), heap.end()); + auto othersBuilder = TOthersData::MakeMergedBuilder(); + while (heap.size()) { + std::pop_heap(heap.begin(), heap.end()); + othersBuilder->Add(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValue()); + if (!heap.back().Next()) { + heap.pop_back(); + } else { + std::push_heap(heap.begin(), heap.end()); + } + } + return othersBuilder->Finish(TOthersData::TFinishContext(BuildStats(otherKeys, Settings, recordsCount))); +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h new file mode 100644 index 0000000000..33430589cc --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.h @@ -0,0 +1,149 @@ +#pragma once +#include "others_storage.h" +#include "settings.h" +#include "stats.h" + +#include <ydb/core/formats/arrow/arrow_helpers.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> + +namespace NKikimr::NArrow::NAccessor { +class TSubColumnsArray; +} + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TColumnElements { +private: + YDB_READONLY_DEF(TStringBuf, KeyName); + YDB_READONLY_DEF(std::deque<TStringBuf>, Values); + std::vector<TString> ValuesStorage; + YDB_READONLY_DEF(std::vector<ui32>, RecordIndexes); + YDB_READONLY(ui32, DataSize, 0); + std::shared_ptr<IChunkedArray> Accessor; + +public: + const std::shared_ptr<IChunkedArray>& GetAccessorVerified() const { + AFL_VERIFY(!!Accessor); + return Accessor; + } + + void BuildSparsedAccessor(const ui32 recordsCount); + void BuildPlainAccessor(const ui32 recordsCount); + + TColumnElements(const TStringBuf key) + : KeyName(key) { + } + + void AddData(const TStringBuf sb, const ui32 index) { + Values.emplace_back(sb); + RecordIndexes.emplace_back(index); + DataSize += sb.size(); + } + + void AddDataToOwn(const TString& value, const ui32 index) { + ValuesStorage.emplace_back(value); + AddData(TStringBuf(value.data(), value.size()), index); + } +}; + +class TDataBuilder { +private: + ui32 CurrentRecordIndex = 0; + THashMap<TStringBuf, TColumnElements> Elements; + std::deque<TString> Storage; + const std::shared_ptr<arrow::DataType> Type; + const TSettings Settings; + +public: + TDataBuilder(const std::shared_ptr<arrow::DataType>& type, const TSettings& settings) + : Type(type) + , Settings(settings) + { + } + + void StartNextRecord() { + ++CurrentRecordIndex; + } + + void AddKV(const TStringBuf key, const TStringBuf value) { + auto itElements = Elements.find(key); + if (itElements == Elements.end()) { + itElements = Elements.emplace(key, key).first; + } + itElements->second.AddData(value, CurrentRecordIndex); + } + + void AddKVOwn(const TStringBuf key, const TString& value) { + Storage.emplace_back(value); + auto itElements = Elements.find(key); + if (itElements == Elements.end()) { + itElements = Elements.emplace(key, key).first; + } + itElements->second.AddData(value, CurrentRecordIndex); + } + + class THeapElements { + private: + const TColumnElements* Elements; + ui32 Index = 0; + ui32 KeyIndex = 0; + + public: + THeapElements(const TColumnElements* elements, const ui32 keyIndex) + : Elements(elements) + , KeyIndex(keyIndex) { + AFL_VERIFY(Elements); + } + + ui32 GetRecordIndex() const { + return Elements->GetRecordIndexes()[Index]; + } + + TStringBuf GetKey() const { + return Elements->GetKeyName(); + } + + ui32 GetKeyIndex() const { + return KeyIndex; + } + + TStringBuf GetValue() const { + return Elements->GetValues()[Index]; + } + + bool operator<(const THeapElements& item) const { + if (Elements->GetRecordIndexes()[Index] == item.Elements->GetRecordIndexes()[item.Index]) { + return item.Elements->GetKeyName() < Elements->GetKeyName(); + } else { + return item.Elements->GetRecordIndexes()[item.Index] < Elements->GetRecordIndexes()[Index]; + } + } + + bool IsValid() const { + return Index < Elements->GetRecordIndexes().size(); + } + + bool Next() { + return ++Index < Elements->GetRecordIndexes().size(); + } + }; + + TDictStats BuildStats(const std::vector<TColumnElements*>& keys, const TSettings& settings, const ui32 recordsCount) const { + auto builder = TDictStats::MakeBuilder(); + for (auto&& i : keys) { + builder.Add(i->GetKeyName(), i->GetRecordIndexes().size(), i->GetDataSize(), + settings.IsSparsed(i->GetRecordIndexes().size(), recordsCount) ? IChunkedArray::EType::SparsedArray + : IChunkedArray::EType::Array); + } + return builder.Finish(); + } + + TOthersData MergeOthers(const std::vector<TColumnElements*>& otherKeys, const ui32 recordsCount) const; + + std::shared_ptr<TSubColumnsArray> Finish(); +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/iterators.cpp b/ydb/core/formats/arrow/accessor/sub_columns/iterators.cpp new file mode 100644 index 0000000000..fd7aa28b7f --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/iterators.cpp @@ -0,0 +1,5 @@ +#include "iterators.h" + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/iterators.h b/ydb/core/formats/arrow/accessor/sub_columns/iterators.h new file mode 100644 index 0000000000..d80557e46c --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/iterators.h @@ -0,0 +1,277 @@ +#pragma once +#include "columns_storage.h" +#include "others_storage.h" + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TGeneralIterator { +private: + std::variant<TColumnsData::TIterator, TOthersData::TIterator> Iterator; + std::optional<ui32> RemappedKey; + std::vector<ui32> RemapKeys; + +public: + TGeneralIterator(TColumnsData::TIterator&& iterator, const std::optional<ui32> remappedKey = {}) + : Iterator(iterator) + , RemappedKey(remappedKey) { + } + TGeneralIterator(TOthersData::TIterator&& iterator, const std::vector<ui32>& remapKeys = {}) + : Iterator(iterator) + , RemapKeys(remapKeys) { + } + bool IsColumnKey() const { + struct TVisitor { + bool operator()(const TOthersData::TIterator& /*iterator*/) { + return false; + } + bool operator()(const TColumnsData::TIterator& /*iterator*/) { + return true; + } + }; + TVisitor visitor; + return std::visit(visitor, Iterator); + } + bool Next() { + struct TVisitor { + bool operator()(TOthersData::TIterator& iterator) { + return iterator.Next(); + } + bool operator()(TColumnsData::TIterator& iterator) { + return iterator.Next(); + } + }; + return std::visit(TVisitor(), Iterator); + } + bool IsValid() const { + struct TVisitor { + bool operator()(const TOthersData::TIterator& iterator) { + return iterator.IsValid(); + } + bool operator()(const TColumnsData::TIterator& iterator) { + return iterator.IsValid(); + } + }; + return std::visit(TVisitor(), Iterator); + } + ui32 GetRecordIndex() const { + struct TVisitor { + ui32 operator()(const TOthersData::TIterator& iterator) { + return iterator.GetRecordIndex(); + } + ui32 operator()(const TColumnsData::TIterator& iterator) { + return iterator.GetCurrentRecordIndex(); + } + }; + return std::visit(TVisitor(), Iterator); + } + ui32 GetKeyIndex() const { + struct TVisitor { + private: + const TGeneralIterator& Owner; + + public: + TVisitor(const TGeneralIterator& owner) + : Owner(owner) { + } + ui32 operator()(const TOthersData::TIterator& iterator) { + return Owner.RemapKeys.size() ? Owner.RemapKeys[iterator.GetKeyIndex()] : iterator.GetKeyIndex(); + } + ui32 operator()(const TColumnsData::TIterator& iterator) { + return Owner.RemappedKey.value_or(iterator.GetKeyIndex()); + } + }; + return std::visit(TVisitor(*this), Iterator); + } + std::string_view GetValue() const { + struct TVisitor { + std::string_view operator()(const TOthersData::TIterator& iterator) { + return iterator.GetValue(); + } + std::string_view operator()(const TColumnsData::TIterator& iterator) { + return iterator.GetValue(); + } + }; + return std::visit(TVisitor(), Iterator); + } + + bool HasValue() const { + struct TVisitor { + bool operator()(const TOthersData::TIterator& iterator) { + return iterator.HasValue(); + } + bool operator()(const TColumnsData::TIterator& iterator) { + return iterator.HasValue(); + } + }; + return std::visit(TVisitor(), Iterator); + } + + bool operator<(const TGeneralIterator& item) const { + return std::tuple(item.GetRecordIndex(), item.GetKeyIndex()) < std::tuple(GetRecordIndex(), GetKeyIndex()); + } +}; + +class TReadIteratorUnorderedKeys { +private: + TColumnsData ColumnsData; + TOthersData OthersData; + std::vector<TGeneralIterator> Iterators; + std::vector<TGeneralIterator*> SortedIterators; + +public: + bool IsValid() const { + return SortedIterators.size(); + } + + TReadIteratorUnorderedKeys(const TColumnsData& columnsData, const TOthersData& othersData) + : ColumnsData(columnsData) + , OthersData(othersData) { + for (ui32 i = 0; i < ColumnsData.GetStats().GetColumnsCount(); ++i) { + Iterators.emplace_back(ColumnsData.BuildIterator(i)); + } + Iterators.emplace_back(OthersData.BuildIterator()); + for (auto&& i : Iterators) { + SortedIterators.emplace_back(&i); + } + auto checkIterator = [](const TGeneralIterator* it) { + return !it->IsValid(); + }; + SortedIterators.erase(std::remove_if(SortedIterators.begin(), SortedIterators.end(), checkIterator), SortedIterators.end()); + } + + template <class TStartRecordActor, class TKVActor, class TFinishRecordActor> + void ReadRecord(const ui32 recordIndex, const TStartRecordActor& startRecordActor, const TKVActor& kvActor, + const TFinishRecordActor& finishRecordActor) { + startRecordActor(recordIndex); + for (ui32 iIter = 0; iIter < SortedIterators.size();) { + auto& itColumn = *SortedIterators[iIter]; + AFL_VERIFY(recordIndex <= itColumn.GetRecordIndex()); + while (itColumn.GetRecordIndex() == recordIndex) { + if (itColumn.HasValue()) { + kvActor(itColumn.GetKeyIndex(), itColumn.GetValue(), itColumn.IsColumnKey()); + } + if (!itColumn.Next()) { + break; + } + } + if (!itColumn.IsValid()) { + std::swap(SortedIterators[iIter], SortedIterators[SortedIterators.size() - 1]); + SortedIterators.pop_back(); + } else { + AFL_VERIFY(recordIndex < itColumn.GetRecordIndex()); + ++iIter; + } + } + finishRecordActor(); + } +}; + +class TReadIteratorOrderedKeys { +private: + TColumnsData ColumnsData; + TOthersData OthersData; + std::vector<TGeneralIterator> Iterators; + std::vector<TGeneralIterator*> SortedIterators; + class TKeyAddress { + private: + YDB_READONLY_DEF(std::string_view, Name); + YDB_READONLY(ui32, OriginalIndex, 0); + YDB_READONLY(bool, IsColumn, false); + + public: + TKeyAddress(const std::string_view& keyName, const ui32 keyIndex, const bool isColumn) + : Name(keyName) + , OriginalIndex(keyIndex) + , IsColumn(isColumn) { + } + + bool operator<(const TKeyAddress& item) const { + return Name < item.Name; + } + }; + + std::vector<TKeyAddress> Addresses; + +public: + bool IsValid() const { + return SortedIterators.size(); + } + + struct TIteratorsComparator { + bool operator()(const TGeneralIterator* l, const TGeneralIterator* r) { + return *l < *r; + } + }; + + TReadIteratorOrderedKeys(const TColumnsData& columnsData, const TOthersData& othersData) + : ColumnsData(columnsData) + , OthersData(othersData) { + for (ui32 i = 0; i < ColumnsData.GetStats().GetColumnsCount(); ++i) { + Addresses.emplace_back(ColumnsData.GetStats().GetColumnName(i), i, true); + } + for (ui32 i = 0; i < OthersData.GetStats().GetColumnsCount(); ++i) { + Addresses.emplace_back(OthersData.GetStats().GetColumnName(i), i, false); + } + std::sort(Addresses.begin(), Addresses.end()); + std::vector<ui32> remapColumns; + remapColumns.resize(ColumnsData.GetStats().GetColumnsCount()); + std::vector<ui32> remapOthers; + remapOthers.resize(OthersData.GetStats().GetColumnsCount()); + for (ui32 i = 0; i < Addresses.size(); ++i) { + if (i) { + AFL_VERIFY(Addresses[i].GetName() != Addresses[i - 1].GetName()); + } + if (Addresses[i].GetIsColumn()) { + remapColumns[Addresses[i].GetOriginalIndex()] = i; + } else { + remapOthers[Addresses[i].GetOriginalIndex()] = i; + } + } + for (ui32 i = 0; i < ColumnsData.GetStats().GetColumnsCount(); ++i) { + Iterators.emplace_back(ColumnsData.BuildIterator(i), remapColumns[i]); + } + Iterators.emplace_back(OthersData.BuildIterator(), remapOthers); + for (auto&& i : Iterators) { + SortedIterators.emplace_back(&i); + } + auto checkIterator = [](const TGeneralIterator* it) { + return !it->IsValid(); + }; + SortedIterators.erase(std::remove_if(SortedIterators.begin(), SortedIterators.end(), checkIterator), SortedIterators.end()); + std::make_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + } + + template <class TStartRecordActor, class TKVActor, class TFinishRecordActor> + void ReadRecord(const ui32 recordIndex, const TStartRecordActor& startRecordActor, const TKVActor& kvActor, + const TFinishRecordActor& finishRecordActor) { + while (SortedIterators.size()) { + while (SortedIterators.size() && SortedIterators.front()->GetRecordIndex() < recordIndex) { + std::pop_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + auto& itColumn = *SortedIterators.back(); + if (!itColumn.Next()) { + SortedIterators.pop_back(); + } else { + std::push_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + } + continue; + } + startRecordActor(recordIndex); + while (SortedIterators.size() && SortedIterators.front()->GetRecordIndex() == recordIndex) { + std::pop_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + auto& itColumn = *SortedIterators.back(); + kvActor(Addresses[itColumn.GetKeyIndex()].GetOriginalIndex(), itColumn.GetValue(), itColumn.IsColumnKey()); + if (!itColumn.Next()) { + SortedIterators.pop_back(); + } else { + std::push_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + } + } + finishRecordActor(); + return; + } + startRecordActor(recordIndex); + finishRecordActor(); + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/others_storage.cpp b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.cpp new file mode 100644 index 0000000000..0d7bafe12a --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.cpp @@ -0,0 +1,145 @@ +#include "others_storage.h" + +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> + +#include <ydb/library/formats/arrow/arrow_helpers.h> +#include <ydb/library/formats/arrow/simple_arrays_cache.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +TOthersData::TBuilderWithStats::TBuilderWithStats() { + Builders = NArrow::MakeBuilders(GetSchema()); + AFL_VERIFY(Builders.size() == 3); + AFL_VERIFY(Builders[0]->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Builders[1]->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Builders[2]->type()->id() == arrow::utf8()->id()); + RecordIndex = static_cast<arrow::UInt32Builder*>(Builders[0].get()); + KeyIndex = static_cast<arrow::UInt32Builder*>(Builders[1].get()); + Values = static_cast<arrow::StringBuilder*>(Builders[2].get()); +} + +void TOthersData::TBuilderWithStats::Add(const ui32 recordIndex, const ui32 keyIndex, const std::string_view value) { + AFL_VERIFY(Builders.size()); + if (StatsByKeyIndex.size() <= keyIndex) { + StatsByKeyIndex.resize((keyIndex + 1) * 2); + } + StatsByKeyIndex[keyIndex].AddValue(value); + if (!LastRecordIndex) { + LastRecordIndex = recordIndex; + LastKeyIndex = keyIndex; + } else { + AFL_VERIFY(*LastRecordIndex < recordIndex || (*LastRecordIndex == recordIndex/* && *LastKeyIndex < keyIndex*/)); + } + TStatusValidator::Validate(RecordIndex->Append(recordIndex)); + RTKeyIndexes.emplace_back(keyIndex); + TStatusValidator::Validate(Values->Append(value.data(), value.size())); + ++RecordsCount; +} + +TOthersData TOthersData::TBuilderWithStats::Finish(const TFinishContext& finishContext) { + AFL_VERIFY(Builders.size()); + auto arrRecordIndex = NArrow::FinishBuilder(std::move(Builders[0])); + auto arrValues = NArrow::FinishBuilder(std::move(Builders[2])); + AFL_VERIFY(arrRecordIndex->type()->id() == arrow::uint32()->id()); + auto arrRecordIndexValue = std::static_pointer_cast<arrow::UInt32Array>(arrRecordIndex); + std::optional<TDictStats> resultStats = finishContext.GetActualStats(); + if (finishContext.GetRemap()) { + for (ui32 idx = 0; idx < RTKeyIndexes.size(); ++idx) { + AFL_VERIFY(RTKeyIndexes[idx] < finishContext.GetRemap()->size()); + const ui32 newIndex = (*finishContext.GetRemap())[RTKeyIndexes[idx]]; + AFL_VERIFY(newIndex < finishContext.GetActualStats().GetColumnsCount()); + TStatusValidator::Validate(KeyIndex->Append(newIndex)); + if (idx) { + const ui32 predKeyIndex = (*finishContext.GetRemap())[RTKeyIndexes[idx - 1]]; + AFL_VERIFY((arrRecordIndexValue->Value(idx - 1) < arrRecordIndexValue->Value(idx)) || + (arrRecordIndexValue->Value(idx - 1) == arrRecordIndexValue->Value(idx) && predKeyIndex < newIndex))( + "r1", arrRecordIndexValue->Value(idx - 1))( + "r2", arrRecordIndexValue->Value(idx))("k1", predKeyIndex)("k2", newIndex); + } + } + } else { + for (ui32 idx = 0; idx < RTKeyIndexes.size(); ++idx) { + TStatusValidator::Validate(KeyIndex->Append(RTKeyIndexes[idx])); + if (idx) { + AFL_VERIFY((arrRecordIndexValue->Value(idx - 1) < arrRecordIndexValue->Value(idx)) || + (arrRecordIndexValue->Value(idx - 1) == arrRecordIndexValue->Value(idx) && RTKeyIndexes[idx - 1] < RTKeyIndexes[idx]))("r1", + arrRecordIndexValue->Value(idx - 1))("r2", arrRecordIndexValue->Value(idx))( + "k1", RTKeyIndexes[idx - 1])("k2", RTKeyIndexes[idx]); + } + } + } + auto arrKeyIndexes = NArrow::FinishBuilder(std::move(Builders[1])); + std::vector<std::shared_ptr<arrow::Array>> arrays = { arrRecordIndex, arrKeyIndexes, arrValues }; + return TOthersData(*resultStats, std::make_shared<TGeneralContainer>(arrow::RecordBatch::Make(GetSchema(), RecordsCount, arrays))); +} + +TOthersData TOthersData::Slice(const ui32 offset, const ui32 count, const TSettings& settings) const { + AFL_VERIFY(Records->GetColumnsCount() == 3); + if (!count) { + return TOthersData::BuildEmpty(); + } + TOthersData::TIterator itOthersData = BuildIterator(); + std::optional<ui32> startPosition = itOthersData.FindPosition(offset); + std::optional<ui32> finishPosition = itOthersData.FindPosition(offset + count); + if (!startPosition || startPosition == finishPosition) { + return TOthersData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(0)); + } + std::map<ui32, TDictStats::TRTStats> usedKeys; + { + itOthersData.MoveToPosition(*startPosition); + for (; itOthersData.IsValid() && itOthersData.GetRecordIndex() < offset + count; itOthersData.Next()) { + auto itUsedKey = usedKeys.find(itOthersData.GetKeyIndex()); + if (itUsedKey == usedKeys.end()) { + itUsedKey = usedKeys.emplace(itOthersData.GetKeyIndex(), Stats.GetColumnName(itOthersData.GetKeyIndex())).first; + } + itUsedKey->second.AddValue(itOthersData.GetValue()); + } + } + std::vector<ui32> keyIndexDecoder; + if (usedKeys.size()) { + keyIndexDecoder.resize(usedKeys.rbegin()->first + 1, Max<ui32>()); + ui32 idx = 0; + for (auto&& i : usedKeys) { + keyIndexDecoder[i.first] = idx++; + } + } + TDictStats::TBuilder statBuilder; + for (auto&& i : usedKeys) { + statBuilder.Add(i.second.GetKeyName(), i.second.GetRecordsCount(), i.second.GetDataSize(), i.second.GetAccessorType(settings, count)); + } + TDictStats sliceStats = statBuilder.Finish(); + + { + auto recordIndexBuilder = NArrow::MakeBuilder(arrow::uint32()); + auto keyIndexBuilder = NArrow::MakeBuilder(arrow::uint32()); + itOthersData.MoveToPosition(*startPosition); + for (; itOthersData.IsValid() && itOthersData.GetRecordIndex() < offset + count; itOthersData.Next()) { + NArrow::Append<arrow::UInt32Type>(*recordIndexBuilder, itOthersData.GetRecordIndex() - offset); + AFL_VERIFY(itOthersData.GetKeyIndex() < keyIndexDecoder.size()); + const ui32 newKeyIndex = keyIndexDecoder[itOthersData.GetKeyIndex()]; + AFL_VERIFY(newKeyIndex < sliceStats.GetColumnsCount()); + NArrow::Append<arrow::UInt32Type>(*keyIndexBuilder, keyIndexDecoder[itOthersData.GetKeyIndex()]); + } + auto recordIndexes = NArrow::FinishBuilder(std::move(recordIndexBuilder)); + auto keyIndexes = NArrow::FinishBuilder(std::move(keyIndexBuilder)); + std::vector<std::shared_ptr<IChunkedArray>> arrays = { std::make_shared<TTrivialArray>(recordIndexes), + std::make_shared<TTrivialArray>(keyIndexes), + GetValuesArray()->ISlice(*startPosition, finishPosition.value_or(GetValuesArray()->GetRecordsCount()) - *startPosition) }; + auto sliceRecords = std::make_shared<TGeneralContainer>(GetSchema(), std::move(arrays)); + return TOthersData(sliceStats, sliceRecords); + } +} + +TOthersData TOthersData::BuildEmpty() { + static TOthersData result = []() { + auto records = std::make_shared<TGeneralContainer>(0); + for (auto&& f : TOthersData::GetSchema()->fields()) { + records->AddField(f, NArrow::TThreadSimpleArraysCache::GetNull(f->type(), 0)).Validate(); + } + return TOthersData(TDictStats::BuildEmpty(), records); + }(); + return result; +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h new file mode 100644 index 0000000000..bb71bef977 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h @@ -0,0 +1,190 @@ +#pragma once + +#include "stats.h" + +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/common/container.h> + +#include <ydb/library/accessor/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TOthersData { +private: + TDictStats Stats; + YDB_READONLY_DEF(std::shared_ptr<TGeneralContainer>, Records); + +public: + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("stats", Stats.DebugJson()); + result.InsertValue("records", Records->DebugJson(true)); + return result; + } + + TOthersData Slice(const ui32 offset, const ui32 count, const TSettings& settings) const; + + static TOthersData BuildEmpty(); + + ui64 GetRawSize() const { + return Records->GetRawSizeVerified(); + } + + class TIterator { + private: + const ui32 RecordsCount; + ui32 CurrentIndex = 0; + + IChunkedArray::TReader RecordIndexReader; + std::shared_ptr<arrow::UInt32Array> RecordIndex; + + IChunkedArray::TReader KeyIndexReader; + std::shared_ptr<arrow::UInt32Array> KeyIndex; + + IChunkedArray::TReader ValuesReader; + std::shared_ptr<arrow::StringArray> Values; + + public: + TIterator(const std::shared_ptr<TGeneralContainer>& records) + : RecordsCount(records->GetRecordsCount()) + , RecordIndexReader(records->GetColumnVerified(0)) + , KeyIndexReader(records->GetColumnVerified(1)) + , ValuesReader(records->GetColumnVerified(2)) { + if (RecordsCount) { + auto recordIndexChunk = RecordIndexReader.GetReadChunk(0); + AFL_VERIFY(recordIndexChunk.GetArray()->length() == RecordsCount); + RecordIndex = std::static_pointer_cast<arrow::UInt32Array>(recordIndexChunk.GetArray()); + + auto keyIndexChunk = KeyIndexReader.GetReadChunk(0); + AFL_VERIFY(keyIndexChunk.GetArray()->length() == RecordsCount); + KeyIndex = std::static_pointer_cast<arrow::UInt32Array>(keyIndexChunk.GetArray()); + + auto valuesChunk = ValuesReader.GetReadChunk(0); + AFL_VERIFY(valuesChunk.GetArray()->length() == RecordsCount); + Values = std::static_pointer_cast<arrow::StringArray>(valuesChunk.GetArray()); + } + + CurrentIndex = 0; + } + + std::optional<ui32> FindPosition(const ui32 findRecordIndex) const { + return NArrow::FindUpperOrEqualPosition(*RecordIndex, findRecordIndex); + } + + void MoveToPosition(const ui32 index) { + CurrentIndex = index; + AFL_VERIFY(IsValid()); + } + + ui32 GetRecordIndex() const { + AFL_VERIFY(IsValid()); + return RecordIndex->Value(CurrentIndex); + } + + ui32 GetKeyIndex() const { + AFL_VERIFY(IsValid()); + return KeyIndex->Value(CurrentIndex); + } + + std::string_view GetValue() const { + AFL_VERIFY(IsValid()); + auto view = Values->GetView(CurrentIndex); + return std::string_view(view.data(), view.size()); + } + + bool HasValue() const { + AFL_VERIFY(IsValid()); + return true; + } + + bool Next() { + AFL_VERIFY(IsValid()); + return ++CurrentIndex < RecordsCount; + } + + bool IsValid() const { + return CurrentIndex < RecordsCount; + } + }; + + TIterator BuildIterator() const { + return TIterator(Records); + } + + const TDictStats& GetStats() const { + return Stats; + } + + ui32 GetColumnsCount() const { + return Records->num_rows(); + } + + static std::shared_ptr<arrow::Schema> GetSchema() { + static arrow::FieldVector fields = { std::make_shared<arrow::Field>("record_idx", arrow::uint32()), + std::make_shared<arrow::Field>("key", arrow::uint32()), std::make_shared<arrow::Field>("value", arrow::utf8()) }; + static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(fields); + return result; + } + + TOthersData(const TDictStats& stats, const std::shared_ptr<TGeneralContainer>& records) + : Stats(stats) + , Records(records) { + AFL_VERIFY(Records->num_columns() == 3)("count", Records->num_columns()); + AFL_VERIFY(Records->GetColumnVerified(0)->GetDataType()->id() == arrow::uint32()->id()); + AFL_VERIFY(Records->GetColumnVerified(1)->GetDataType()->id() == arrow::uint32()->id()); + AFL_VERIFY(Records->GetColumnVerified(2)->GetDataType()->id() == arrow::utf8()->id()); + } + + const std::shared_ptr<IChunkedArray>& GetValuesArray() const { + return Records->GetColumnVerified(2); + } + + class TFinishContext { + private: + const TDictStats ActualStats; + YDB_READONLY_DEF(std::optional<std::vector<ui32>>, Remap); + + public: + const TDictStats& GetActualStats() const { + return ActualStats; + } + + TFinishContext(const TDictStats& actualStats, const std::vector<ui32>& remap) + : ActualStats(actualStats) + , Remap(remap) { + } + + TFinishContext(const TDictStats& actualStats) + : ActualStats(actualStats){ + } + }; + + class TBuilderWithStats: TNonCopyable { + private: + std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; + arrow::UInt32Builder* RecordIndex; + arrow::UInt32Builder* KeyIndex; + std::vector<ui32> RTKeyIndexes; + arrow::StringBuilder* Values; + std::optional<ui32> LastRecordIndex; + std::optional<ui32> LastKeyIndex; + ui32 RecordsCount = 0; + YDB_READONLY_DEF(std::vector<TDictStats::TRTStatsValue>, StatsByKeyIndex); + + public: + TBuilderWithStats(); + + void Add(const ui32 recordIndex, const ui32 keyIndex, const std::string_view value); + + TOthersData Finish(const TFinishContext& finishContext); + }; + + static std::shared_ptr<TBuilderWithStats> MakeMergedBuilder() { + return std::make_shared<TBuilderWithStats>(); + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/request.cpp b/ydb/core/formats/arrow/accessor/sub_columns/request.cpp new file mode 100644 index 0000000000..82315e8973 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/request.cpp @@ -0,0 +1,39 @@ +#include "request.h" +#include "constructor.h" + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +NKikimrArrowAccessorProto::TRequestedConstructor TRequestedConstuctor::DoSerializeToProto() const { + NKikimrArrowAccessorProto::TRequestedConstructor result; + *result.MutableSubColumns()->MutableSettings() = Settings.SerializeToRequestedProto(); + return result; +} + +bool TRequestedConstuctor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) { + return Settings.DeserializeFromRequestedProto(proto.GetSubColumns().GetSettings()); +} + +TConclusionStatus TRequestedConstuctor::DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) { + if (auto columnsLimit = features.Extract<ui32>("COLUMNS_LIMIT")) { + Settings.SetColumnsLimit(*columnsLimit); + } + if (auto kff = features.Extract<ui32>("SPARSED_DETECTOR_KFF")) { + Settings.SetSparsedDetectorKff(*kff); + } + if (auto memLimit = features.Extract<ui32>("MEM_LIMIT_CHUNK")) { + Settings.SetChunkMemoryLimit(*memLimit); + } + if (auto othersFraction = features.Extract<double>("OTHERS_ALLOWED_FRACTION")) { + if (*othersFraction < 0 || 1 < *othersFraction) { + return TConclusionStatus::Fail("others fraction have to be in [0, 1] interval"); + } + Settings.SetOthersAllowedFraction(*othersFraction); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusion<TConstructorContainer> TRequestedConstuctor::DoBuildConstructor() const { + return std::make_shared<TConstructor>(Settings); +} + +} diff --git a/ydb/core/formats/arrow/accessor/sub_columns/request.h b/ydb/core/formats/arrow/accessor/sub_columns/request.h new file mode 100644 index 0000000000..264edd95ac --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/request.h @@ -0,0 +1,32 @@ +#pragma once +#include "settings.h" + +#include <ydb/core/formats/arrow/accessor/abstract/request.h> + +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TRequestedConstuctor: public IRequestedConstructor { +public: + static TString GetClassNameStatic() { + return TGlobalConst::SubColumnsDataAccessorName; + } + +private: + static inline auto Registrator = TFactory::TRegistrator<TRequestedConstuctor>(GetClassNameStatic()); + + TSettings Settings; + + virtual TConclusion<TConstructorContainer> DoBuildConstructor() const override; + virtual NKikimrArrowAccessorProto::TRequestedConstructor DoSerializeToProto() const override; + virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) override; + virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/settings.cpp b/ydb/core/formats/arrow/accessor/sub_columns/settings.cpp new file mode 100644 index 0000000000..9ac9272452 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/settings.cpp @@ -0,0 +1,5 @@ +#include "settings.h" + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/settings.h b/ydb/core/formats/arrow/accessor/sub_columns/settings.h new file mode 100644 index 0000000000..c4ee71dcb3 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/settings.h @@ -0,0 +1,86 @@ +#pragma once +#include <ydb/core/formats/arrow/arrow_helpers.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/protos/accessor.pb.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TSettings { +private: + YDB_ACCESSOR(ui32, SparsedDetectorKff, 20); + YDB_ACCESSOR(ui32, ColumnsLimit, 1024); + YDB_ACCESSOR(ui32, ChunkMemoryLimit, 50 * 1024 * 1024); + YDB_READONLY(double, OthersAllowedFraction, 0.05); + +public: + TSettings() = default; + TSettings(const ui32 sparsedDetectorKff, const ui32 columnsLimit, const ui32 chunkMemoryLimit, const double othersAllowedFraction) + : SparsedDetectorKff(sparsedDetectorKff) + , ColumnsLimit(columnsLimit) + , ChunkMemoryLimit(chunkMemoryLimit) + , OthersAllowedFraction(othersAllowedFraction) { + AFL_VERIFY(OthersAllowedFraction >= 0 && OthersAllowedFraction <= 1)("others_fraction", OthersAllowedFraction); + } + + TSettings& SetOthersAllowedFraction(const double value) { + AFL_VERIFY(value >= 0 && value <= 1)("others_fraction_value", value); + OthersAllowedFraction = value; + return *this; + } + + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("sparsed_detector_kff", SparsedDetectorKff); + result.InsertValue("columns_limit", ColumnsLimit); + result.InsertValue("memory_limit", ChunkMemoryLimit); + result.InsertValue("others_allowed_fraction", OthersAllowedFraction); + return result; + } + + bool IsSparsed(const ui32 keyUsageCount, const ui32 recordsCount) const { + AFL_VERIFY(recordsCount); + return keyUsageCount * SparsedDetectorKff < recordsCount; + } + + template <class TProto> + void SerializeToProtoImpl(TProto& result) const { + result.SetSparsedDetectorKff(SparsedDetectorKff); + result.SetColumnsLimit(ColumnsLimit); + result.SetChunkMemoryLimit(ChunkMemoryLimit); + result.SetOthersAllowedFraction(OthersAllowedFraction); + } + + template <class TProto> + bool DeserializeFromProtoImpl(const TProto& proto) { + SparsedDetectorKff = proto.GetSparsedDetectorKff(); + ColumnsLimit = proto.GetColumnsLimit(); + ChunkMemoryLimit = proto.GetChunkMemoryLimit(); + OthersAllowedFraction = proto.GetOthersAllowedFraction(); + return true; + } + + NKikimrArrowAccessorProto::TConstructor::TSubColumns::TSettings SerializeToProto() const { + NKikimrArrowAccessorProto::TConstructor::TSubColumns::TSettings result; + SerializeToProtoImpl(result); + return result; + } + + bool DeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor::TSubColumns::TSettings& proto) { + return DeserializeFromProtoImpl(proto); + } + + NKikimrArrowAccessorProto::TRequestedConstructor::TSubColumns::TSettings SerializeToRequestedProto() const { + NKikimrArrowAccessorProto::TRequestedConstructor::TSubColumns::TSettings result; + SerializeToProtoImpl(result); + return result; + } + + bool DeserializeFromRequestedProto(const NKikimrArrowAccessorProto::TRequestedConstructor::TSubColumns::TSettings& proto) { + return DeserializeFromProtoImpl(proto); + } +}; + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp b/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp new file mode 100644 index 0000000000..e38b912f32 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp @@ -0,0 +1,161 @@ +#include "settings.h" +#include "stats.h" + +#include <ydb/core/formats/arrow/accessor/plain/constructor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/constructor.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> + +#include <ydb/library/formats/arrow/arrow_helpers.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { +TSplittedColumns TDictStats::SplitByVolume(const TSettings& settings, const ui32 recordsCount) const { + std::map<ui64, std::vector<TRTStats>> bySize; + ui64 sumSize = 0; + for (ui32 i = 0; i < GetColumnsCount(); ++i) { + bySize[GetColumnSize(i)].emplace_back(GetRTStats(i)); + sumSize += GetColumnSize(i); + } + std::vector<TRTStats> columnStats; + std::vector<TRTStats> otherStats; + ui64 columnsSize = 0; + for (auto it = bySize.rbegin(); it != bySize.rend(); ++it) { + for (auto&& i : it->second) { + AFL_VERIFY(sumSize >= columnsSize); + if (columnStats.size() < settings.GetColumnsLimit() && + (!sumSize || 1.0 * (sumSize - columnsSize) / sumSize > settings.GetOthersAllowedFraction())) { + columnsSize += it->first; + columnStats.emplace_back(std::move(i)); + } else { + otherStats.emplace_back(std::move(i)); + } + } + } + std::sort(columnStats.begin(), columnStats.end()); + std::sort(otherStats.begin(), otherStats.end()); + auto columnsBuilder = MakeBuilder(); + auto othersBuilder = MakeBuilder(); + for (auto&& i : columnStats) { + columnsBuilder.Add(i.GetKeyName(), i.GetRecordsCount(), i.GetDataSize(), i.GetAccessorType(settings, recordsCount)); + } + for (auto&& i : otherStats) { + othersBuilder.Add(i.GetKeyName(), i.GetRecordsCount(), i.GetDataSize(), i.GetAccessorType(settings, recordsCount)); + } + return TSplittedColumns(columnsBuilder.Finish(), othersBuilder.Finish()); +} + +TDictStats TDictStats::Merge(const std::vector<const TDictStats*>& stats, const TSettings& settings, const ui32 recordsCount) { + std::map<std::string_view, TRTStats> resultMap; + for (auto&& i : stats) { + for (ui32 idx = 0; idx < i->GetColumnsCount(); ++idx) { + auto it = resultMap.find(i->GetColumnName(idx)); + if (it == resultMap.end()) { + it = resultMap.emplace(i->GetColumnName(idx), TRTStats(i->GetColumnName(idx))).first; + } + it->second.Add(*i, idx); + } + } + auto builder = MakeBuilder(); + for (auto&& i : resultMap) { + builder.Add(i.second.GetKeyName(), i.second.GetRecordsCount(), i.second.GetDataSize(), i.second.GetAccessorType(settings, recordsCount)); + } + return builder.Finish(); +} + +ui32 TDictStats::GetColumnRecordsCount(const ui32 index) const { + AFL_VERIFY(index < DataRecordsCount->length()); + return DataRecordsCount->Value(index); +} + +ui32 TDictStats::GetColumnSize(const ui32 index) const { + AFL_VERIFY(index < DataSize->length()); + return DataSize->Value(index); +} + +std::string_view TDictStats::GetColumnName(const ui32 index) const { + AFL_VERIFY(index < DataNames->length()); + auto view = DataNames->GetView(index); + return std::string_view(view.data(), view.size()); +} + +TDictStats::TDictStats(const std::shared_ptr<arrow::RecordBatch>& original) + : Original(original) { + AFL_VERIFY(Original->num_columns() == 4)("count", Original->num_columns()); + AFL_VERIFY(Original->column(0)->type()->id() == arrow::utf8()->id()); + AFL_VERIFY(Original->column(1)->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Original->column(2)->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Original->column(3)->type()->id() == arrow::uint8()->id()); + DataNames = std::static_pointer_cast<arrow::StringArray>(Original->column(0)); + DataRecordsCount = std::static_pointer_cast<arrow::UInt32Array>(Original->column(1)); + DataSize = std::static_pointer_cast<arrow::UInt32Array>(Original->column(2)); + AccessorType = std::static_pointer_cast<arrow::UInt8Array>(Original->column(3)); +} + +TConstructorContainer TDictStats::GetAccessorConstructor(const ui32 columnIndex) const { + switch (GetAccessorType(columnIndex)) { + case IChunkedArray::EType::Array: + return std::make_shared<NAccessor::NPlain::TConstructor>(); + case IChunkedArray::EType::SparsedArray: + return std::make_shared<NAccessor::NSparsed::TConstructor>(); + case IChunkedArray::EType::Undefined: + case IChunkedArray::EType::SerializedChunkedArray: + case IChunkedArray::EType::SubColumnsArray: + case IChunkedArray::EType::ChunkedArray: + AFL_VERIFY(false); + return TConstructorContainer(); + } +} + +TDictStats TDictStats::BuildEmpty() { + return TDictStats(MakeEmptyBatch(GetStatsSchema())); +} + +TString TDictStats::SerializeAsString(const std::shared_ptr<NSerialization::ISerializer>& serializer) const { + AFL_VERIFY(serializer); + return serializer->SerializePayload(Original); +} + +IChunkedArray::EType TDictStats::GetAccessorType(const ui32 columnIndex) const { + AFL_VERIFY(columnIndex < AccessorType->length()); + return (IChunkedArray::EType)AccessorType->Value(columnIndex); +} + +TDictStats::TBuilder::TBuilder() { + Builders = NArrow::MakeBuilders(GetStatsSchema()); + AFL_VERIFY(Builders.size() == 4); + AFL_VERIFY(Builders[0]->type()->id() == arrow::utf8()->id()); + AFL_VERIFY(Builders[1]->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Builders[2]->type()->id() == arrow::uint32()->id()); + AFL_VERIFY(Builders[3]->type()->id() == arrow::uint8()->id()); + Names = static_cast<arrow::StringBuilder*>(Builders[0].get()); + Records = static_cast<arrow::UInt32Builder*>(Builders[1].get()); + DataSize = static_cast<arrow::UInt32Builder*>(Builders[2].get()); + AccessorType = static_cast<arrow::UInt8Builder*>(Builders[3].get()); +} + +void TDictStats::TBuilder::Add(const TString& name, const ui32 recordsCount, const ui32 dataSize, const IChunkedArray::EType accessorType) { + AFL_VERIFY(Builders.size()); + if (!LastKeyName) { + LastKeyName = name; + } else { + AFL_VERIFY(*LastKeyName < name)("last", LastKeyName)("name", name); + } + AFL_VERIFY(recordsCount); + TStatusValidator::Validate(Names->Append(name.data(), name.size())); + TStatusValidator::Validate(Records->Append(recordsCount)); + TStatusValidator::Validate(DataSize->Append(dataSize)); + TStatusValidator::Validate(AccessorType->Append((ui8)accessorType)); + ++RecordsCount; +} + +void TDictStats::TBuilder::Add( + const std::string_view name, const ui32 recordsCount, const ui32 dataSize, const IChunkedArray::EType accessorType) { + Add(TString(name.data(), name.size()), recordsCount, dataSize, accessorType); +} + +TDictStats TDictStats::TBuilder::Finish() { + AFL_VERIFY(Builders.size()); + auto arrays = NArrow::Finish(std::move(Builders)); + return TDictStats(arrow::RecordBatch::Make(GetStatsSchema(), RecordsCount, std::move(arrays))); +} + +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.h b/ydb/core/formats/arrow/accessor/sub_columns/stats.h new file mode 100644 index 0000000000..04fed6e240 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.h @@ -0,0 +1,197 @@ +#pragma once +#include "settings.h" + +#include <ydb/core/formats/arrow/accessor/abstract/constructor.h> + +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/actors/core/log.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow::NAccessor::NSubColumns { + +class TSplittedColumns; + +class TDictStats { +private: + std::shared_ptr<arrow::RecordBatch> Original; + std::shared_ptr<arrow::StringArray> DataNames; + std::shared_ptr<arrow::UInt32Array> DataRecordsCount; + std::shared_ptr<arrow::UInt32Array> DataSize; + std::shared_ptr<arrow::UInt8Array> AccessorType; + +public: + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("key_names", NArrow::DebugJson(DataNames, 1000000, 1000000)["data"]); + result.InsertValue("records", NArrow::DebugJson(DataRecordsCount, 1000000, 1000000)["data"]); + result.InsertValue("size", NArrow::DebugJson(DataSize, 1000000, 1000000)["data"]); + result.InsertValue("accessor", NArrow::DebugJson(AccessorType, 1000000, 1000000)["data"]); + return result; + } + static TDictStats BuildEmpty(); + TString SerializeAsString(const std::shared_ptr<NSerialization::ISerializer>& serializer) const; + + std::optional<ui32> GetKeyIndexOptional(const std::string_view keyName) const { + for (ui32 i = 0; i < DataNames->length(); ++i) { + const auto arrView = DataNames->GetView(i); + if (std::string_view(arrView.data(), arrView.size()) == keyName) { + return i; + } + } + return std::nullopt; + } + + ui32 GetKeyIndexVerified(const std::string_view keyName) const { + for (ui32 i = 0; i < DataNames->length(); ++i) { + const auto arrView = DataNames->GetView(i); + if (std::string_view(arrView.data(), arrView.size()) == keyName) { + return i; + } + } + AFL_VERIFY(false); + return 0; + } + + class TRTStatsValue { + private: + YDB_READONLY(ui32, RecordsCount, 0); + YDB_READONLY(ui32, DataSize, 0); + + public: + TRTStatsValue() = default; + TRTStatsValue(const ui32 recordsCount, const ui32 dataSize) + : RecordsCount(recordsCount) + , DataSize(dataSize) { + } + + void AddValue(const std::string_view str) { + ++RecordsCount; + DataSize += str.size(); + } + + void Add(const TDictStats& stats, const ui32 idx) { + RecordsCount += stats.GetColumnRecordsCount(idx); + DataSize += stats.GetColumnSize(idx); + } + + IChunkedArray::EType GetAccessorType(const TSettings& settings, const ui32 recordsCount) const { + return settings.IsSparsed(RecordsCount, recordsCount) ? IChunkedArray::EType::SparsedArray : IChunkedArray::EType::Array; + } + }; + + class TRTStats: public TRTStatsValue { + private: + using TBase = TRTStatsValue; + YDB_READONLY_DEF(TString, KeyName); + + public: + TRTStats(const TString& keyName) + : KeyName(keyName) { + } + TRTStats(const TString& keyName, const ui32 recordsCount, const ui32 dataSize) + : TBase(recordsCount, dataSize) + , KeyName(keyName) { + } + + TRTStats(const std::string_view keyName) + : KeyName(keyName.data(), keyName.size()) { + } + TRTStats(const std::string_view keyName, const ui32 recordsCount, const ui32 dataSize) + : TBase(recordsCount, dataSize) + , KeyName(keyName.data(), keyName.size()) { + } + + bool operator<(const TRTStats& item) const { + return KeyName < item.KeyName; + } + }; + + static TDictStats Merge(const std::vector<const TDictStats*>& stats, const TSettings& settings, const ui32 recordsCount); + + TSplittedColumns SplitByVolume(const TSettings& settings, const ui32 recordsCount) const; + + class TBuilder: TNonCopyable { + private: + std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; + arrow::StringBuilder* Names; + arrow::UInt32Builder* Records; + arrow::UInt32Builder* DataSize; + arrow::UInt8Builder* AccessorType; + + std::optional<TString> LastKeyName; + ui32 RecordsCount = 0; + + public: + TBuilder(); + void Add(const TString& name, const ui32 recordsCount, const ui32 dataSize, const IChunkedArray::EType accessorType); + void Add(const std::string_view name, const ui32 recordsCount, const ui32 dataSize, const IChunkedArray::EType accessorType); + TDictStats Finish(); + }; + + static TBuilder MakeBuilder() { + return TBuilder(); + } + + std::shared_ptr<arrow::Schema> BuildColumnsSchema() const { + arrow::FieldVector fields; + for (ui32 i = 0; i < DataNames->length(); ++i) { + const auto view = DataNames->GetView(i); + fields.emplace_back(std::make_shared<arrow::Field>(std::string(view.data(), view.size()), arrow::utf8())); + } + return std::make_shared<arrow::Schema>(fields); + } + + TRTStats GetRTStats(const ui32 index) const { + auto view = GetColumnName(index); + return TRTStats(TString(view.data(), view.size()), GetColumnRecordsCount(index), GetColumnSize(index)); + } + + ui32 GetColumnsCount() const { + return Original->num_rows(); + } + + TConstructorContainer GetAccessorConstructor(const ui32 columnIndex) const; + IChunkedArray::EType GetAccessorType(const ui32 columnIndex) const; + + std::string_view GetColumnName(const ui32 index) const; + TString GetColumnNameString(const ui32 index) const { + auto view = GetColumnName(index); + return TString(view.data(), view.size()); + } + ui32 GetColumnRecordsCount(const ui32 index) const; + ui32 GetColumnSize(const ui32 index) const; + + static std::shared_ptr<arrow::Schema> GetStatsSchema() { + static arrow::FieldVector fields = { std::make_shared<arrow::Field>("name", arrow::utf8()), + std::make_shared<arrow::Field>("count", arrow::uint32()), std::make_shared<arrow::Field>("size", arrow::uint32()), + std::make_shared<arrow::Field>("accessor_type", arrow::uint8()) }; + static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(fields); + return result; + } + + bool IsSparsed(const ui32 columnIndex, const ui32 recordsCount, const TSettings& settings) const; + TDictStats(const std::shared_ptr<arrow::RecordBatch>& original); +}; + +class TSplittedColumns { +private: + TDictStats Columns; + TDictStats Others; + +public: + TDictStats ExtractColumns() { + return std::move(Columns); + } + TDictStats ExtractOthers() { + return std::move(Others); + } + TSplittedColumns(TDictStats&& columns, TDictStats&& others) + : Columns(std::move(columns)) + , Others(std::move(others)) { + } +}; +} // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp new file mode 100644 index 0000000000..5bb31befcb --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp @@ -0,0 +1,116 @@ +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <yql/essentials/types/binary_json/read.h> +#include <yql/essentials/types/binary_json/write.h> + +#include <regex> + +Y_UNIT_TEST_SUITE(SubColumnsArrayAccessor) { + using namespace NKikimr::NArrow::NAccessor; + using namespace NKikimr::NArrow; + using namespace NKikimr; + + std::string PrepareToCompare(const std::string& str) { + return std::regex_replace(str, std::regex(" |\\n"), ""); + } + + TString PrintBinaryJsons(const std::shared_ptr<arrow::ChunkedArray>& array) { + TStringBuilder sb; + sb << "["; + for (auto&& i : array->chunks()) { + sb << "["; + AFL_VERIFY(i->type()->id() == arrow::binary()->id()); + auto views = std::static_pointer_cast<arrow::BinaryArray>(i); + for (ui32 r = 0; r < views->length(); ++r) { + if (views->IsNull(r)) { + sb << "null"; + } else { + sb << NBinaryJson::SerializeToJson(TStringBuf(views->GetView(r).data(), views->GetView(r).size())); + } + if (r + 1 != views->length()) { + sb << ","; + } + } + sb << "]"; + } + sb << "]"; + return sb; + } + + Y_UNIT_TEST(SlicesDef) { + NSubColumns::TSettings settings(4, 1, 0); + + const std::vector<TString> jsons = { + R"({"a" : 1, "b" : 1, "c" : "111"})", + "null", + R"({"a1" : 2, "b" : 2, "c" : "222"})", + R"({"a" : 3, "b" : 3, "c" : "333"})", + "null", + R"({"a" : 5, "b1" : 5})", + }; + + TTrivialArray::TPlainBuilder<arrow::BinaryType> arrBuilder; + ui32 idx = 0; + for (auto&& i : jsons) { + if (i != "null") { + auto v = NBinaryJson::SerializeToBinaryJson(i); + NBinaryJson::TBinaryJson* bJson = std::get_if<NBinaryJson::TBinaryJson>(&v); + arrBuilder.AddRecord(idx, std::string_view(bJson->data(), bJson->size())); + } + ++idx; + } + auto bJsonArr = arrBuilder.Finish(jsons.size()); + auto arrData = TSubColumnsArray::Make(bJsonArr, std::make_shared<NSubColumns::TFirstLevelSchemaData>(), settings).DetachResult(); + Cerr << arrData->DebugJson() << Endl; + AFL_VERIFY(PrintBinaryJsons(arrData->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")( + "string", PrintBinaryJsons(arrData->GetChunkedArray())); + { + auto arrSlice = arrData->ISlice(0, 6); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")( + "string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + } + { + auto arrSlice = arrData->ISlice(0, 5); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null]])")( + "string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + } + { + auto arrSlice = arrData->ISlice(0, 0); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([])")("string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["columns_data"]["stats"].GetStringRobust() == R"({"accessor":[],"size":[],"key_names":[],"records":[]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["others_data"]["stats"].GetStringRobust() == R"({"accessor":[],"size":[],"key_names":[],"records":[]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + } + { + auto arrSlice = arrData->ISlice(0, 2); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null]])")( + "string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["columns_data"]["stats"].GetStringRobust() == R"({"accessor":[1],"size":[12],"key_names":["c"],"records":[1]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["others_data"]["stats"].GetStringRobust() == R"({"accessor":[1,1],"size":[1,1],"key_names":["a","b"],"records":[1,1]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + } + { + auto arrSlice = arrData->ISlice(0, 3); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"}]])")( + "string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["columns_data"]["stats"].GetStringRobust() == R"({"accessor":[1],"size":[19],"key_names":["c"],"records":[2]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["others_data"]["stats"].GetStringRobust() == R"({"accessor":[1,1,1],"size":[1,1,2],"key_names":["a","a1","b"],"records":[1,1,2]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + } + { + auto arrSlice = arrData->ISlice(3, 3); + AFL_VERIFY(PrintBinaryJsons(arrSlice->GetChunkedArray()) == R"([[{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")( + "string", PrintBinaryJsons(arrSlice->GetChunkedArray())); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["columns_data"]["stats"].GetStringRobust() == R"({"accessor":[1],"size":[16],"key_names":["c"],"records":[1]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + AFL_VERIFY(arrSlice->DebugJson()["internal"]["others_data"]["stats"].GetStringRobust() == R"({"accessor":[1,1,1],"size":[2,1,1],"key_names":["a","b","b1"],"records":[2,1,1]})") + ("string", arrSlice->DebugJson().GetStringRobust()); + } + } +}; diff --git a/ydb/core/formats/arrow/accessor/sub_columns/ut/ya.make b/ydb/core/formats/arrow/accessor/sub_columns/ut/ya.make new file mode 100644 index 0000000000..68c75fb208 --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/ut/ya.make @@ -0,0 +1,16 @@ +UNITTEST_FOR(ydb/core/formats/arrow/accessor/sub_columns) + +SIZE(SMALL) + +PEERDIR( + ydb/core/formats/arrow/accessor/sub_columns + yql/essentials/public/udf/service/stub +) + +SRCS( + ut_sub_columns.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/formats/arrow/accessor/sub_columns/ya.make b/ydb/core/formats/arrow/accessor/sub_columns/ya.make new file mode 100644 index 0000000000..18e798da3f --- /dev/null +++ b/ydb/core/formats/arrow/accessor/sub_columns/ya.make @@ -0,0 +1,30 @@ +LIBRARY() + +PEERDIR( + ydb/core/formats/arrow/accessor/abstract + ydb/core/formats/arrow/accessor/plain + ydb/core/formats/arrow/accessor/sparsed + ydb/core/formats/arrow/accessor/composite_serial + ydb/core/formats/arrow/save_load + ydb/core/formats/arrow/common + ydb/library/formats/arrow + ydb/library/formats/arrow/protos + yql/essentials/types/binary_json +) + +SRCS( + GLOBAL constructor.cpp + GLOBAL request.cpp + data_extractor.cpp + accessor.cpp + direct_builder.cpp + settings.cpp + stats.cpp + others_storage.cpp + columns_storage.cpp + iterators.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/formats/arrow/accessor/ya.make b/ydb/core/formats/arrow/accessor/ya.make index 197b97d9ef..add9a81bf8 100644 --- a/ydb/core/formats/arrow/accessor/ya.make +++ b/ydb/core/formats/arrow/accessor/ya.make @@ -5,6 +5,7 @@ PEERDIR( ydb/core/formats/arrow/accessor/plain ydb/core/formats/arrow/accessor/composite_serial ydb/core/formats/arrow/accessor/sparsed + ydb/core/formats/arrow/accessor/sub_columns ) END() diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 27e36bc429..754b465b0e 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -12,6 +12,37 @@ namespace NKikimr::NArrow { +template <class TArray, class TValue> +std::optional<ui32> FindUpperOrEqualPosition(const TArray& arr, const TValue val) { + if (!arr.length()) { + return std::nullopt; + } + TValue left = arr.Value(0); + TValue right = arr.Value(arr.length() - 1); + if (val < left) { + return 0; + } else if (right < val) { + return std::nullopt; + } else if (val == left) { + return 0; + } + ui32 idxLeft = 0; + ui32 idxRight = arr.length() - 1; + while (idxRight - idxLeft > 1) { + const ui32 idxMiddle = 0.5 * (idxRight + idxLeft); + Y_ABORT_UNLESS(idxMiddle != idxRight); + Y_ABORT_UNLESS(idxMiddle != idxLeft); + const TValue middle = arr.Value(idxMiddle); + if (middle < val) { + idxLeft = idxMiddle; + } else if (val < middle) { + idxRight = idxMiddle; + } else { + idxRight = idxMiddle; + } + } + return idxRight; +} arrow::Result<std::shared_ptr<arrow::DataType>> GetArrowType(NScheme::TTypeInfo typeInfo); arrow::Result<std::shared_ptr<arrow::DataType>> GetCSVArrowType(NScheme::TTypeInfo typeId); diff --git a/ydb/core/formats/arrow/common/adapter.h b/ydb/core/formats/arrow/common/adapter.h index f348f43762..3e6acbbeb9 100644 --- a/ydb/core/formats/arrow/common/adapter.h +++ b/ydb/core/formats/arrow/common/adapter.h @@ -58,7 +58,7 @@ public: } slices.emplace_back(batch->Slice(filter.GetStartIndex(), filter.GetSliceSize())); } - return NArrow::ToBatch(TStatusValidator::GetValid(arrow::Table::FromRecordBatches(slices)), true); + return NArrow::ToBatch(TStatusValidator::GetValid(arrow::Table::FromRecordBatches(slices))); } [[nodiscard]] static std::shared_ptr<arrow::RecordBatch> GetEmptySame(const std::shared_ptr<arrow::RecordBatch>& batch) { return batch->Slice(0, 0); diff --git a/ydb/core/formats/arrow/common/container.cpp b/ydb/core/formats/arrow/common/container.cpp index 9100a9fa56..b3b5669e99 100644 --- a/ydb/core/formats/arrow/common/container.cpp +++ b/ydb/core/formats/arrow/common/container.cpp @@ -67,6 +67,9 @@ void TGeneralContainer::DeleteFieldsByIndex(const std::vector<ui32>& idxs) { void TGeneralContainer::Initialize() { std::optional<ui64> recordsCount; + if (Schema->num_fields() == 0) { + recordsCount = 0; + } AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size()); for (i32 i = 0; i < Schema->num_fields(); ++i) { AFL_VERIFY(Columns[i]); @@ -134,7 +137,7 @@ TGeneralContainer::TGeneralContainer(const ui32 recordsCount) , Schema(std::make_shared<NModifier::TSchema>()) { } -std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameVerified(const std::string& fieldId) const { +std::shared_ptr<NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameVerified(const std::string& fieldId) const { auto result = GetAccessorByNameOptional(fieldId); AFL_VERIFY(result)("event", "cannot_find_accessor_in_general_container")("field_id", fieldId)("schema", Schema->ToString()); return result; @@ -222,12 +225,18 @@ TConclusionStatus TGeneralContainer::SyncSchemaTo( return TConclusionStatus::Success(); } -TString TGeneralContainer::DebugString() const { - TStringBuilder result; +NJson::TJsonValue TGeneralContainer::DebugJson(const bool withData) const { + NJson::TJsonValue result; if (RecordsCount) { - result << "records_count=" << *RecordsCount << ";"; + result.InsertValue("records_count", *RecordsCount); + } + result.InsertValue("schema", Schema->ToString()); + if (withData) { + auto& arrData = result.InsertValue("data", NJson::JSON_ARRAY); + for (auto&& i : Columns) { + arrData.AppendValue(i->DebugJson()); + } } - result << "schema=" << Schema->ToString() << ";"; return result; } diff --git a/ydb/core/formats/arrow/common/container.h b/ydb/core/formats/arrow/common/container.h index 23f3279e8d..5e4705f201 100644 --- a/ydb/core/formats/arrow/common/container.h +++ b/ydb/core/formats/arrow/common/container.h @@ -25,17 +25,37 @@ class TGeneralContainer { private: std::optional<ui64> RecordsCount; YDB_READONLY_DEF(std::shared_ptr<NModifier::TSchema>, Schema); - std::vector<std::shared_ptr<NAccessor::IChunkedArray>> Columns; + YDB_READONLY_DEF(std::vector<std::shared_ptr<NAccessor::IChunkedArray>>, Columns); void Initialize(); public: TGeneralContainer(const ui32 recordsCount); + TGeneralContainer Slice(const ui32 offset, const ui32 count) const { + std::vector<std::shared_ptr<NAccessor::IChunkedArray>> columns; + for (auto&& i : Columns) { + columns.emplace_back(i->ISlice(offset, count)); + } + return TGeneralContainer(Schema->GetFields(), std::move(columns)); + } + + ui64 GetRawSizeVerified() const { + ui64 result = 0; + for (auto&& i : Columns) { + result += i->GetRawSizeVerified(); + } + return result; + } + ui32 GetRecordsCount() const { AFL_VERIFY(RecordsCount); return *RecordsCount; } - TString DebugString() const; + NJson::TJsonValue DebugJson(const bool withData = false) const; + + TString DebugString(const bool withData = false) const { + return DebugJson(withData).GetStringRobust(); + } [[nodiscard]] TConclusionStatus SyncSchemaTo(const std::shared_ptr<arrow::Schema>& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults); diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index 2b953b55e0..9860ffc56d 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -1,13 +1,15 @@ #pragma once -#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/exec.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_aggregate.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> -#include <util/system/types.h> +#include "arrow_filter.h" +#include "arrow_helpers.h" -#include <ydb/library/arrow_kernels/operations.h> #include <ydb/core/scheme_types/scheme_types_defs.h> -#include "arrow_helpers.h" -#include "arrow_filter.h" + +#include <ydb/library/arrow_kernels/operations.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_aggregate.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/exec.h> +#include <util/system/types.h> namespace NKikimr::NArrow { @@ -24,7 +26,7 @@ enum class EAggregate { NumRows = 7, }; -} +} // namespace NKikimr::NArrow namespace NKikimr::NSsa { @@ -32,10 +34,12 @@ using EOperation = NArrow::EOperation; using EAggregate = NArrow::EAggregate; using TFunctionPtr = std::shared_ptr<arrow::compute::ScalarFunction>; -const char * GetFunctionName(EOperation op); -const char * GetFunctionName(EAggregate op); -const char * GetHouseFunctionName(EAggregate op); -inline const char * GetHouseGroupByName() { return "ch.group_by"; } +const char* GetFunctionName(EOperation op); +const char* GetFunctionName(EAggregate op); +const char* GetHouseFunctionName(EAggregate op); +inline const char* GetHouseGroupByName() { + return "ch.group_by"; +} EOperation ValidateOperation(EOperation op, ui32 argsSize); class TDatumBatch { @@ -92,7 +96,6 @@ private: : GeneratedFlag(generated) , ColumnName(columnName) , ColumnId(columnId) { - } public: @@ -116,16 +119,19 @@ public: template <class TAssignObject> class IStepFunction { using TSelf = IStepFunction<TAssignObject>; + protected: arrow::compute::ExecContext* Ctx; + public: using TPtr = std::shared_ptr<TSelf>; IStepFunction(arrow::compute::ExecContext* ctx) - : Ctx(ctx) - {} + : Ctx(ctx) { + } - virtual ~IStepFunction() {} + virtual ~IStepFunction() { + } virtual arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const = 0; @@ -144,6 +150,7 @@ protected: class TAssign { private: YDB_ACCESSOR_DEF(std::optional<ui32>, YqlOperationId); + public: using TOperationType = EOperation; @@ -151,134 +158,64 @@ public: : Column(column) , Operation(ValidateOperation(op, args.size())) , Arguments(std::move(args)) - , FuncOpts(nullptr) - {} + , FuncOpts(nullptr) { + } TAssign(const TColumnInfo& column, EOperation op, std::vector<TColumnInfo>&& args, std::shared_ptr<arrow::compute::FunctionOptions> funcOpts) : Column(column) , Operation(ValidateOperation(op, args.size())) , Arguments(std::move(args)) - , FuncOpts(std::move(funcOpts)) - {} - - explicit TAssign(const TColumnInfo& column, bool value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::BooleanScalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, i8 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::Int8Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, ui8 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::UInt8Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, i16 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::Int16Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, ui16 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::UInt16Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, i32 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::Int32Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, ui32 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::UInt32Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, i64 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::Int64Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, ui64 value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::UInt64Scalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, float value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::FloatScalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, double value) - : Column(column) - , Operation(EOperation::Constant) - , Constant(std::make_shared<arrow::DoubleScalar>(value)) - , FuncOpts(nullptr) - {} - - explicit TAssign(const TColumnInfo& column, const std::string& value, bool binary) - : Column(column) - , Operation(EOperation::Constant) - , Constant( binary ? std::make_shared<arrow::BinaryScalar>(arrow::Buffer::FromString(value), arrow::binary()) - : std::make_shared<arrow::StringScalar>(value)) - , FuncOpts(nullptr) - {} + , FuncOpts(std::move(funcOpts)) { + } TAssign(const TColumnInfo& column, const std::shared_ptr<arrow::Scalar>& value) : Column(column) , Operation(EOperation::Constant) , Constant(value) - , FuncOpts(nullptr) - {} + , FuncOpts(nullptr) { + } - TAssign(const TColumnInfo& column, - TFunctionPtr kernelFunction, - std::vector<TColumnInfo>&& args, - std::shared_ptr<arrow::compute::FunctionOptions> funcOpts) + TAssign(const TColumnInfo& column, TFunctionPtr kernelFunction, std::vector<TColumnInfo>&& args, + std::shared_ptr<arrow::compute::FunctionOptions> funcOpts) : Column(column) , Arguments(std::move(args)) , FuncOpts(std::move(funcOpts)) - , KernelFunction(std::move(kernelFunction)) - {} + , KernelFunction(std::move(kernelFunction)) { + } static TAssign MakeTimestamp(const TColumnInfo& column, ui64 value); - bool IsConstant() const { return Operation == EOperation::Constant; } - bool IsOk() const { return Operation != EOperation::Unspecified || !!KernelFunction; } - EOperation GetOperation() const { return Operation; } - const std::vector<TColumnInfo>& GetArguments() const { return Arguments; } - std::shared_ptr<arrow::Scalar> GetConstant() const { return Constant; } - const TColumnInfo& GetColumn() const { return Column; } - const std::string& GetName() const { return Column.GetColumnName(); } - const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); } + bool IsConstant() const { + return Operation == EOperation::Constant; + } + bool IsOk() const { + return Operation != EOperation::Unspecified || !!KernelFunction; + } + EOperation GetOperation() const { + return Operation; + } + const std::vector<TColumnInfo>& GetArguments() const { + return Arguments; + } + std::shared_ptr<arrow::Scalar> GetConstant() const { + return Constant; + } + const TColumnInfo& GetColumn() const { + return Column; + } + const std::string& GetName() const { + return Column.GetColumnName(); + } + const arrow::compute::FunctionOptions* GetOptions() const { + return FuncOpts.get(); + } IStepFunction<TAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const; TString DebugString() const; + private: const TColumnInfo Column; - EOperation Operation{EOperation::Unspecified}; + EOperation Operation{ EOperation::Unspecified }; std::vector<TColumnInfo> Arguments; std::shared_ptr<arrow::Scalar> Constant; std::shared_ptr<arrow::compute::FunctionOptions> FuncOpts; @@ -291,8 +228,7 @@ public: TAggregateAssign(const TColumnInfo& column, EAggregate op = EAggregate::Unspecified) : Column(column) - , Operation(op) - { + , Operation(op) { if (op != EAggregate::Count) { op = EAggregate::Unspecified; } @@ -301,58 +237,66 @@ public: TAggregateAssign(const TColumnInfo& column, EAggregate op, const TColumnInfo& arg) : Column(column) , Operation(op) - , Arguments({arg}) - { + , Arguments({ arg }) { if (Arguments.empty()) { op = EAggregate::Unspecified; } } - TAggregateAssign(const TColumnInfo& column, - TFunctionPtr kernelFunction, - const std::vector<TColumnInfo>& args) + TAggregateAssign(const TColumnInfo& column, TFunctionPtr kernelFunction, const std::vector<TColumnInfo>& args) : Column(column) , Arguments(args) - , KernelFunction(kernelFunction) - {} + , KernelFunction(kernelFunction) { + } - bool IsOk() const { return Operation != EAggregate::Unspecified || !!KernelFunction; } - EAggregate GetOperation() const { return Operation; } - const std::vector<TColumnInfo>& GetArguments() const { return Arguments; } - std::vector<TColumnInfo>& MutableArguments() { return Arguments; } - const std::string& GetName() const { return Column.GetColumnName(); } - const arrow::compute::ScalarAggregateOptions* GetOptions() const { return &ScalarOpts; } + bool IsOk() const { + return Operation != EAggregate::Unspecified || !!KernelFunction; + } + EAggregate GetOperation() const { + return Operation; + } + const std::vector<TColumnInfo>& GetArguments() const { + return Arguments; + } + std::vector<TColumnInfo>& MutableArguments() { + return Arguments; + } + const std::string& GetName() const { + return Column.GetColumnName(); + } + const arrow::compute::ScalarAggregateOptions* GetOptions() const { + return &ScalarOpts; + } IStepFunction<TAggregateAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const; TString DebugString() const; private: TColumnInfo Column; - EAggregate Operation{EAggregate::Unspecified}; + EAggregate Operation{ EAggregate::Unspecified }; std::vector<TColumnInfo> Arguments; - arrow::compute::ScalarAggregateOptions ScalarOpts; // TODO: make correct options + arrow::compute::ScalarAggregateOptions ScalarOpts; // TODO: make correct options TFunctionPtr KernelFunction; }; - /// Group of commands that finishes with projection. Steps add locality for columns definition. /// /// In step we have non-decreasing count of columns (line to line) till projection. So columns are either source /// for the step either defined in this step. -/// It's also possible to use several filters in step. They would be applyed after assignes, just before projection. -/// "Filter (a > 0 AND b <= 42)" is logically equal to "Filret a > 0; Filter b <= 42" +/// It's also possible to use several filters in step. They would be applied after assigns, just before projection. +/// "Filter (a > 0 AND b <= 42)" is logically equal to "Filter a > 0; Filter b <= 42" /// Step combines (f1 AND f2 AND ... AND fn) into one filter and applies it once. You have to split filters in different /// steps if you want to run them separately. I.e. if you expect that f1 is fast and leads to a small row-set. -/// Then when we place all assignes before filters they have the same row count. It's possible to run them in parallel. +/// Then when we place all assigns before filters they have the same row count. It's possible to run them in parallel. class TProgramStep { private: YDB_READONLY_DEF(std::vector<TAssign>, Assignes); - YDB_READONLY_DEF(std::vector<TColumnInfo>, Filters); // List of filter columns. Implicit "Filter by (f1 AND f2 AND .. AND fn)" + YDB_READONLY_DEF(std::vector<TColumnInfo>, Filters); // List of filter columns. Implicit "Filter by (f1 AND f2 AND .. AND fn)" std::set<ui32> FilterOriginalColumnIds; YDB_ACCESSOR_DEF(std::vector<TAggregateAssign>, GroupBy); - YDB_READONLY_DEF(std::vector<TColumnInfo>, GroupByKeys); // TODO: it's possible to use them without GROUP BY for DISTINCT - YDB_READONLY_DEF(std::vector<TColumnInfo>, Projection); // Step's result columns (remove others) + YDB_READONLY_DEF(std::vector<TColumnInfo>, GroupByKeys); // TODO: it's possible to use them without GROUP BY for DISTINCT + YDB_READONLY_DEF(std::vector<TColumnInfo>, Projection); // Step's result columns (remove others) public: using TDatumBatch = TDatumBatch; @@ -458,8 +402,8 @@ public: TProgram() = default; TProgram(std::vector<std::shared_ptr<TProgramStep>>&& steps) - : Steps(std::move(steps)) - {} + : Steps(std::move(steps)) { + } arrow::Status ApplyTo(std::shared_ptr<arrow::Table>& table, arrow::compute::ExecContext* ctx) const { std::vector<std::shared_ptr<arrow::RecordBatch>> batches = NArrow::SliceToRecordBatches(table); @@ -500,18 +444,13 @@ public: } }; -inline arrow::Status ApplyProgram( - std::shared_ptr<arrow::Table>& batch, - const TProgram& program, - arrow::compute::ExecContext* ctx = nullptr) { +inline arrow::Status ApplyProgram(std::shared_ptr<arrow::Table>& batch, const TProgram& program, arrow::compute::ExecContext* ctx = nullptr) { return program.ApplyTo(batch, ctx); } inline arrow::Status ApplyProgram( - std::shared_ptr<arrow::RecordBatch>& batch, - const TProgram& program, - arrow::compute::ExecContext* ctx = nullptr) { + std::shared_ptr<arrow::RecordBatch>& batch, const TProgram& program, arrow::compute::ExecContext* ctx = nullptr) { return program.ApplyTo(batch, ctx); } -} +} // namespace NKikimr::NSsa diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 32bb67db69..34d4d2f7c7 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -154,13 +154,13 @@ public: [[nodiscard]] bool InitPosition(const ui64 position); - std::shared_ptr<arrow::Table> Slice(const ui64 offset, const ui64 count) const { - std::vector<std::shared_ptr<arrow::ChunkedArray>> slicedArrays; - for (auto&& i : Columns) { - slicedArrays.emplace_back(i->Slice(offset, count)); - } - return arrow::Table::Make(std::make_shared<arrow::Schema>(Fields), slicedArrays, count); - } + std::shared_ptr<arrow::Table> Slice(const ui64 offset, const ui64 count) const { + std::vector<std::shared_ptr<arrow::ChunkedArray>> slicedArrays; + for (auto&& i : Columns) { + slicedArrays.emplace_back(i->Slice(offset, count)); + } + return arrow::Table::Make(std::make_shared<arrow::Schema>(Fields), slicedArrays, count); + } bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const { if (Fields.size() != (size_t)schema->num_fields()) { diff --git a/ydb/core/formats/arrow/save_load/loader.cpp b/ydb/core/formats/arrow/save_load/loader.cpp index 24b01d7ff7..0b87f220cd 100644 --- a/ydb/core/formats/arrow/save_load/loader.cpp +++ b/ydb/core/formats/arrow/save_load/loader.cpp @@ -8,18 +8,14 @@ TString TColumnLoader::DebugString() const { TStringBuilder result; result << "accessor_constructor:" << AccessorConstructor->DebugString() << ";"; result << "result_field:" << ResultField->ToString() << ";"; - if (Transformer) { - result << "transformer:" << Transformer->DebugString() << ";"; - } result << "serializer:" << Serializer->DebugString() << ";"; return result; } -TColumnLoader::TColumnLoader(NTransformation::ITransformer::TPtr transformer, const NSerialization::TSerializerContainer& serializer, +TColumnLoader::TColumnLoader(const NSerialization::TSerializerContainer& serializer, const TConstructorContainer& accessorConstructor, const std::shared_ptr<arrow::Field>& resultField, const std::shared_ptr<arrow::Scalar>& defaultValue, const ui32 columnId) : Serializer(serializer) - , Transformer(transformer) , AccessorConstructor(accessorConstructor) , ResultField(resultField) , DefaultValue(defaultValue) @@ -33,59 +29,29 @@ const std::shared_ptr<arrow::Field>& TColumnLoader::GetField() const { return ResultField; } -arrow::Result<std::shared_ptr<arrow::RecordBatch>> TColumnLoader::Apply(const TString& data) const { - Y_ABORT_UNLESS(Serializer); - arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = - Transformer ? Serializer->Deserialize(data) : Serializer->Deserialize(data, AccessorConstructor->GetExpectedSchema(ResultField)); - if (!columnArray.ok()) { - return columnArray; - } - if (Transformer) { - return Transformer->Transform(*columnArray); - } else { - return columnArray; - } -} - -std::shared_ptr<arrow::RecordBatch> TColumnLoader::ApplyRawVerified(const TString& data) const { - return TStatusValidator::GetValid(Apply(data)); -} - TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const { - return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type()); + return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr()); } TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(const TString& dataStr, const ui32 recordsCount) const { - auto result = Apply(dataStr); - if (result.ok()) { - return BuildAccessor(*result, BuildAccessorContext(recordsCount)); - } else { - AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_blob")("data_size", dataStr.size())( - "expected_records_count", recordsCount)("problem", result.status().ToString()); - return TConclusionStatus::Fail(result.status().ToString()); - } + return BuildAccessor(dataStr, BuildAccessorContext(recordsCount)); } std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const { - auto data = TStatusValidator::GetValid(Apply(dataStr)); - return BuildAccessor(data, BuildAccessorContext(recordsCount)); + return BuildAccessor(dataStr, BuildAccessorContext(recordsCount)).DetachResult(); } -std::shared_ptr<IChunkedArray> TColumnLoader::BuildAccessor( - const std::shared_ptr<arrow::RecordBatch>& batch, const TChunkConstructionData& chunkData) const { - return AccessorConstructor->Construct(batch, chunkData).DetachResult(); +TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::BuildAccessor(const TString& originalData, const TChunkConstructionData& chunkData) const { + return AccessorConstructor->DeserializeFromString(originalData, chunkData); } std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildDefaultAccessor(const ui32 recordsCount) const { - return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult(); + return AccessorConstructor + ->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr())) + .DetachResult(); } bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const { - if (!!Transformer != !!item.Transformer) { - return false; - } else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) { - return false; - } if (!Serializer.IsEqualTo(item.Serializer)) { return false; } diff --git a/ydb/core/formats/arrow/save_load/loader.h b/ydb/core/formats/arrow/save_load/loader.h index 64ecb6c211..77f0c989db 100644 --- a/ydb/core/formats/arrow/save_load/loader.h +++ b/ydb/core/formats/arrow/save_load/loader.h @@ -3,7 +3,6 @@ #include <ydb/core/formats/arrow/serializer/abstract.h> #include <ydb/library/accessor/accessor.h> -#include <ydb/library/formats/arrow/transformer/abstract.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> @@ -11,16 +10,13 @@ namespace NKikimr::NArrow::NAccessor { class TColumnLoader { private: - NSerialization::TSerializerContainer Serializer; - NTransformation::ITransformer::TPtr Transformer; + YDB_READONLY_DEF(NSerialization::TSerializerContainer, Serializer); YDB_READONLY_DEF(NAccessor::TConstructorContainer, AccessorConstructor); YDB_READONLY_DEF(std::shared_ptr<arrow::Field>, ResultField); YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue); const ui32 ColumnId; - arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const; - std::shared_ptr<IChunkedArray> BuildAccessor( - const std::shared_ptr<arrow::RecordBatch>& batch, const TChunkConstructionData& chunkData) const; + TConclusion<std::shared_ptr<IChunkedArray>> BuildAccessor(const TString& originalData, const TChunkConstructionData& chunkData) const; public: std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const; @@ -29,7 +25,7 @@ public: TString DebugString() const; - TColumnLoader(NTransformation::ITransformer::TPtr transformer, const NSerialization::TSerializerContainer& serializer, + TColumnLoader(const NSerialization::TSerializerContainer& serializer, const NAccessor::TConstructorContainer& accessorConstructor, const std::shared_ptr<arrow::Field>& resultField, const std::shared_ptr<arrow::Scalar>& defaultValue, const ui32 columnId); @@ -42,7 +38,6 @@ public: TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const; std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const; TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(const TString& data, const ui32 expectedRecordsCount) const; - std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const; }; } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/save_load/saver.cpp b/ydb/core/formats/arrow/save_load/saver.cpp index 95adebc764..a35ec267e5 100644 --- a/ydb/core/formats/arrow/save_load/saver.cpp +++ b/ydb/core/formats/arrow/save_load/saver.cpp @@ -2,9 +2,8 @@ namespace NKikimr::NArrow::NAccessor { -TColumnSaver::TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer serializer) - : Transformer(transformer) - , Serializer(serializer) +TColumnSaver::TColumnSaver(const NArrow::NSerialization::TSerializerContainer serializer) + : Serializer(serializer) { Y_ABORT_UNLESS(Serializer); } @@ -28,11 +27,7 @@ TString TColumnSaver::Apply(const std::shared_ptr<arrow::RecordBatch>& data) con serializer = it->second; } } - if (Transformer) { - return serializer->SerializeFull(Transformer->Transform(data)); - } else { - return serializer->SerializePayload(data); - } + return serializer->SerializePayload(data); } }
\ No newline at end of file diff --git a/ydb/core/formats/arrow/save_load/saver.h b/ydb/core/formats/arrow/save_load/saver.h index dd9feb4114..955e204e3e 100644 --- a/ydb/core/formats/arrow/save_load/saver.h +++ b/ydb/core/formats/arrow/save_load/saver.h @@ -2,7 +2,6 @@ #include <ydb/core/formats/arrow/serializer/abstract.h> #include <ydb/library/accessor/accessor.h> -#include <ydb/library/formats/arrow/transformer/abstract.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> @@ -11,13 +10,12 @@ namespace NKikimr::NArrow::NAccessor { class TColumnSaver { private: - NArrow::NTransformation::ITransformer::TPtr Transformer; YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); std::map<ui32, NArrow::NSerialization::TSerializerContainer> SerializerBySizeUpperBorder; public: TColumnSaver() = default; - TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer serializer); + TColumnSaver(const NArrow::NSerialization::TSerializerContainer serializer); void AddSerializerWithBorder(const ui32 upperBorder, const NArrow::NSerialization::TSerializerContainer& serializer) { if (Serializer.IsCompatibleForExchange(serializer)) { diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp index a580ed3341..0676d5652b 100644 --- a/ydb/core/formats/arrow/serializer/native.cpp +++ b/ydb/core/formats/arrow/serializer/native.cpp @@ -86,6 +86,9 @@ TString TNativeSerializer::DoSerializePayload(const std::shared_ptr<arrow::Recor arrow::ipc::IpcPayload payload; // Build payload. Compression if set up performed here. TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload)); +#ifndef NDEBUG + TStatusValidator::Validate(batch->ValidateFull()); +#endif int32_t metadata_length = 0; arrow::io::MockOutputStream mock; @@ -99,7 +102,9 @@ TString TNativeSerializer::DoSerializePayload(const std::shared_ptr<arrow::Recor // Write prepared payload into the resultant string. No extra allocation will be made. TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length)); Y_ABORT_UNLESS(out.GetPosition() == str.size()); - AFL_VERIFY_DEBUG(Deserialize(str, batch->schema()).ok()); +#ifndef NDEBUG + TStatusValidator::GetValid(Deserialize(str, batch->schema())); +#endif AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields()); return str; } diff --git a/ydb/core/formats/arrow/splitter/simple.cpp b/ydb/core/formats/arrow/splitter/simple.cpp index a113084b53..1a3bb840c3 100644 --- a/ydb/core/formats/arrow/splitter/simple.cpp +++ b/ydb/core/formats/arrow/splitter/simple.cpp @@ -3,15 +3,18 @@ #include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/library/formats/arrow/common/validation.h> +#include <ydb/library/formats/arrow/splitter/similar_packer.h> + #include <util/string/join.h> namespace NKikimr::NArrow::NSplitter { -std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::Split( + const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const { AFL_VERIFY(data); AFL_VERIFY(field); - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field}); - auto batch = arrow::RecordBatch::Make(schema, data->length(), {data}); + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field }); + auto batch = arrow::RecordBatch::Make(schema, data->length(), { data }); return Split(batch, maxBlobSize); } @@ -30,8 +33,7 @@ public: : Data(data) , SplitFactor(baseSplitFactor) , MaxBlobSize(maxBlobSize) - , ColumnSaver(columnSaver) - { + , ColumnSaver(columnSaver) { AFL_VERIFY(Data && Data->num_rows()); AFL_VERIFY(SplitFactor); } @@ -42,8 +44,7 @@ public: , Result(TSaverSplittedChunk(data, std::move(serializedData))) , SplitFactor(baseSplitFactor) , MaxBlobSize(maxBlobSize) - , ColumnSaver(columnSaver) - { + , ColumnSaver(columnSaver) { AFL_VERIFY(Data && Data->num_rows()); AFL_VERIFY(SplitFactor); } @@ -53,7 +54,7 @@ public: AFL_VERIFY(!Result); AFL_VERIFY(++Iterations < 100); AFL_VERIFY(SplitFactor <= Data->num_rows())("factor", SplitFactor)("records", Data->num_rows())("iteration", Iterations)( - "size", NArrow::GetBatchDataSize(Data)); + "size", NArrow::GetBatchDataSize(Data)); bool found = false; std::vector<TSplitChunk> result; if (SplitFactor == 1) { @@ -93,7 +94,10 @@ public: AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount); auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); - result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver); + result.emplace_back( + std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + + 1, + MaxBlobSize, badSlice, ColumnSaver); badStartPosition = {}; badBatchRecordsCount = 0; badBatchCount = 0; @@ -106,7 +110,9 @@ public: if (badStartPosition) { auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); - result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver); + result.emplace_back( + std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, + MaxBlobSize, badSlice, ColumnSaver); } ++SplitFactor; } @@ -121,9 +127,10 @@ public: std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const { AFL_VERIFY(data->num_rows()); - TSplitChunk baseChunk(Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1, maxBlobSize, data, ColumnSaver); - std::vector<TSplitChunk> chunks = {baseChunk}; - for (auto it = chunks.begin(); it != chunks.end(); ) { + TSplitChunk baseChunk( + Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1, maxBlobSize, data, ColumnSaver); + std::vector<TSplitChunk> chunks = { baseChunk }; + for (auto it = chunks.begin(); it != chunks.end();) { AFL_VERIFY(chunks.size() < 100); if (!!it->GetResult()) { ++it; @@ -145,7 +152,8 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<ar return result; } -std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount( + const std::shared_ptr<arrow::RecordBatch>& data, const std::vector<ui32>& recordsCount) const { std::vector<TSaverSplittedChunk> result; ui64 position = 0; for (auto&& i : recordsCount) { @@ -157,44 +165,9 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::share return result; } -std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const { - auto splitPartSizesLocal = splitPartSizesExt; - Y_ABORT_UNLESS(data); - { - ui32 sumSizes = 0; - for (auto&& i : splitPartSizesExt) { - sumSizes += i; - } - Y_ABORT_UNLESS(sumSizes <= dataSerialization.size()); - - if (sumSizes < dataSerialization.size()) { - splitPartSizesLocal.emplace_back(dataSerialization.size() - sumSizes); - } - } - std::vector<ui64> recordsCount; - i64 remainedRecordsCount = data->num_rows(); - const double rowsPerByte = 1.0 * data->num_rows() / dataSerialization.size(); - i32 remainedParts = splitPartSizesLocal.size(); - for (ui32 idx = 0; idx < splitPartSizesLocal.size(); ++idx) { - AFL_VERIFY(remainedRecordsCount >= remainedParts)("remained_records_count", remainedRecordsCount) - ("remained_parts", remainedParts)("idx", idx)("size", splitPartSizesLocal.size())("sizes", JoinSeq(",", splitPartSizesLocal))("data_size", dataSerialization.size()); - --remainedParts; - i64 expectedRecordsCount = rowsPerByte * splitPartSizesLocal[idx]; - if (expectedRecordsCount < 1) { - expectedRecordsCount = 1; - } else if (remainedRecordsCount < expectedRecordsCount + remainedParts) { - expectedRecordsCount = remainedRecordsCount - remainedParts; - } - if (idx + 1 == splitPartSizesLocal.size()) { - expectedRecordsCount = remainedRecordsCount; - } - Y_ABORT_UNLESS(expectedRecordsCount); - recordsCount.emplace_back(expectedRecordsCount); - remainedRecordsCount -= expectedRecordsCount; - Y_ABORT_UNLESS(remainedRecordsCount >= 0); - } - Y_ABORT_UNLESS(remainedRecordsCount == 0); - return SplitByRecordsCount(data, recordsCount); +std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes( + std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const { + return SplitByRecordsCount(data, TSimilarPacker::SizesToRecordsCount(data->num_rows(), dataSerialization, splitPartSizesExt)); } -} +} // namespace NKikimr::NArrow::NSplitter diff --git a/ydb/core/formats/arrow/splitter/simple.h b/ydb/core/formats/arrow/splitter/simple.h index 1405d3a6dc..4f23cc97e7 100644 --- a/ydb/core/formats/arrow/splitter/simple.h +++ b/ydb/core/formats/arrow/splitter/simple.h @@ -113,8 +113,9 @@ public: std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const; std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const; - std::vector<TSaverSplittedChunk> SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const; - std::vector<TSaverSplittedChunk> SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const; + std::vector<TSaverSplittedChunk> SplitByRecordsCount(const std::shared_ptr<arrow::RecordBatch>& data, const std::vector<ui32>& recordsCount) const; + std::vector<TSaverSplittedChunk> SplitBySizes( + std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const; }; } diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index d7f447a1b2..1b95f9ea8c 100644 --- a/ydb/core/formats/arrow/ut/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -506,7 +506,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { auto filterInfo = TColumnInfo::Generated(2, "filter"); auto resInfo = TColumnInfo::Generated(3, "res"); - step->AddAssigne(TAssign(yInfo, 56)); + step->AddAssigne(TAssign(yInfo, std::make_shared<arrow::Int64Scalar>(56))); step->AddAssigne(TAssign(resInfo, EOperation::Add, {xInfo, yInfo})); step->AddFilter(filterInfo); step->AddProjection(filterInfo); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index 3142aca264..28baedc4b3 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -173,7 +173,7 @@ private: Y_DEBUG_ABORT_UNLESS(ArrowBatch != nullptr); auto* protoArrowBatch = Remote->Record.MutableArrowBatch(); protoArrowBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema())); - protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch, true))); + protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch))); break; } } diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp new file mode 100644 index 0000000000..ad39fdf69c --- /dev/null +++ b/ydb/core/kqp/ut/olap/json_ut.cpp @@ -0,0 +1,503 @@ +#include "helpers/get_value.h" +#include "helpers/local.h" +#include "helpers/query_executor.h" +#include "helpers/typed_local.h" +#include "helpers/writer.h" + +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/kqp/ut/common/columnshard.h> +#include <ydb/core/tx/columnshard/hooks/testing/controller.h> +#include <ydb/core/tx/columnshard/test_helper/controllers.h> +#include <ydb/core/wrappers/fake_storage.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <util/string/strip.h> + +namespace NKikimr::NKqp { + +Y_UNIT_TEST_SUITE(KqpOlapJson) { + class ICommand { + private: + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) = 0; + + public: + virtual ~ICommand() = default; + + TConclusionStatus Execute(TKikimrRunner& kikimr) { + return DoExecute(kikimr); + } + }; + + class TSchemaCommand: public ICommand { + private: + const TString Command; + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override { + Cerr << "EXECUTE: " << Command << Endl; + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(Command).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + return TConclusionStatus::Success(); + } + + public: + TSchemaCommand(const TString& command) + : Command(command) { + } + }; + + class TDataCommand: public ICommand { + private: + const TString Command; + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override { + Cerr << "EXECUTE: " << Command << Endl; + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto client = kikimr.GetQueryClient(); + auto prepareResult = client.ExecuteQuery(Command, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + return TConclusionStatus::Success(); + } + + public: + TDataCommand(const TString& command) + : Command(command) { + } + }; + + class TSelectCommand: public ICommand { + private: + const TString Command; + const TString Compare; + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override { + Cerr << "EXECUTE: " << Command << Endl; + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto it = kikimr.GetQueryClient().StreamExecuteQuery(Command, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), NYdb::EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + if (Compare) { + Cerr << "COMPARE: " << Compare << Endl; + Cerr << "OUTPUT: " << output << Endl; + CompareYson(output, Compare); + } + return TConclusionStatus::Success(); + } + + public: + TSelectCommand(const TString& command, const TString& compare) + : Command(command) + , Compare(compare) { + } + }; + + class TStopCompactionCommand: public ICommand { + private: + virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override { + auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>(); + AFL_VERIFY(controller); + controller->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + return TConclusionStatus::Success(); + } + + public: + TStopCompactionCommand() { + } + }; + + class TOneCompactionCommand: public ICommand { + private: + virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override { + auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>(); + AFL_VERIFY(controller); + AFL_VERIFY(!controller->IsBackgroundEnable(NKikimr::NYDBTest::ICSController::EBackground::Compaction)); + const i64 compactions = controller->GetCompactionFinishedCounter().Val(); + controller->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + const TInstant start = TInstant::Now(); + while (TInstant::Now() - start < TDuration::Seconds(5)) { + if (compactions < controller->GetCompactionFinishedCounter().Val()) { + Cerr << "COMPACTION_HAPPENED: " << compactions << " -> " << controller->GetCompactionFinishedCounter().Val() << Endl; + break; + } + Cerr << "WAIT_COMPACTION: " << controller->GetCompactionFinishedCounter().Val() << Endl; + Sleep(TDuration::MilliSeconds(300)); + } + AFL_VERIFY(compactions < controller->GetCompactionFinishedCounter().Val()); + controller->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + return TConclusionStatus::Success(); + } + + public: + TOneCompactionCommand() { + } + }; + + class TWaitCompactionCommand: public ICommand { + private: + virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override { + auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>(); + AFL_VERIFY(controller); + controller->WaitCompactions(TDuration::Seconds(5)); + return TConclusionStatus::Success(); + } + + public: + TWaitCompactionCommand() { + } + }; + + class TScriptExecutor { + private: + std::vector<std::shared_ptr<ICommand>> Commands; + + public: + TScriptExecutor(const std::vector<std::shared_ptr<ICommand>>& commands) + : Commands(commands) + { + + } + void Execute() { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + auto settings = TKikimrSettings().SetAppConfig(appConfig).SetColumnShardAlterObjectEnabled(true).SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->SetOverrideMemoryLimitForPortionReading(1e+10); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); + for (auto&& i : Commands) { + i->Execute(kikimr); + } + } + }; + + class TScriptVariator { + private: + std::vector<TScriptExecutor> Scripts; + std::shared_ptr<ICommand> BuildCommand(TString command) { + if (command.StartsWith("SCHEMA:")) { + command = command.substr(7); + return std::make_shared<TSchemaCommand>(command); + } else if (command.StartsWith("DATA:")) { + command = command.substr(5); + return std::make_shared<TDataCommand>(command); + } else if (command.StartsWith("READ:")) { + auto lines = StringSplitter(command.substr(5)).SplitBySet("\n").ToList<TString>(); + int step = 0; + TString request; + TString expectation; + for (auto&& i : lines) { + i = Strip(i); + if (i.StartsWith("EXPECTED:")) { + step = 1; + i = i.substr(9); + } + if (step == 0) { + request += i; + } else if (step == 1) { + expectation += i; + } + } + return std::make_shared<TSelectCommand>(request, expectation); + } else if (command.StartsWith("WAIT_COMPACTION")) { + return std::make_shared<TWaitCompactionCommand>(); + } else if (command.StartsWith("STOP_COMPACTION")) { + return std::make_shared<TStopCompactionCommand>(); + } else if (command.StartsWith("ONE_COMPACTION")) { + return std::make_shared<TOneCompactionCommand>(); + } else { + AFL_VERIFY(false)("command", command); + return nullptr; + } + } + void BuildScripts(const std::vector<std::vector<std::shared_ptr<ICommand>>>& commands, const ui32 currentLayer, + std::vector<std::shared_ptr<ICommand>>& currentScript, std::vector<TScriptExecutor>& scripts) { + if (currentLayer == commands.size()) { + scripts.emplace_back(currentScript); + return; + } + for (auto&& i : commands[currentLayer]) { + currentScript.emplace_back(i); + BuildScripts(commands, currentLayer + 1, currentScript, scripts); + currentScript.pop_back(); + } + } + + void BuildVariantsImpl(const std::vector<std::vector<TString>>& chunks, const ui32 currentLayer, std::vector<TString>& currentCommand, + std::vector<TString>& results) { + if (currentLayer == chunks.size()) { + results.emplace_back(JoinSeq("", currentCommand)); + return; + } + for (auto&& i : chunks[currentLayer]) { + currentCommand.emplace_back(i); + BuildVariantsImpl(chunks, currentLayer + 1, currentCommand, results); + currentCommand.pop_back(); + } + } + std::vector<TString> BuildVariants(const TString& command) { + auto chunks = StringSplitter(command).SplitByString("$$").ToList<TString>(); + std::vector<std::vector<TString>> chunksVariants; + for (ui32 i = 0; i < chunks.size(); ++i) { + if (i % 2 == 0) { + chunksVariants.emplace_back(std::vector<TString>({ chunks[i] })); + } else { + chunksVariants.emplace_back(StringSplitter(chunks[i]).SplitBySet("|").ToList<TString>()); + } + } + std::vector<TString> result; + std::vector<TString> currentCommand; + BuildVariantsImpl(chunksVariants, 0, currentCommand, result); + return result; + } + + public: + TScriptVariator(const TString& script) { + auto commands = StringSplitter(script).SplitByString("------").ToList<TString>(); + std::vector<std::vector<std::shared_ptr<ICommand>>> commandsDescription; + for (auto&& i : commands) { + auto& cVariants = commandsDescription.emplace_back(); + i = Strip(i); + std::vector<TString> variants = BuildVariants(i); + for (auto&& v : variants) { + cVariants.emplace_back(BuildCommand(v)); + } + } + std::vector<TScriptExecutor> scripts; + std::vector<std::shared_ptr<ICommand>> scriptCommands; + BuildScripts(commandsDescription, 0, scriptCommands, Scripts); + } + + void Execute() { + for (auto&& i : Scripts) { + i.Execute(); + } + + } + }; + + Y_UNIT_TEST(EmptyVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$1024|0|1$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES (1u), (2u), (3u), (4u) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES (11u), (12u), (13u), (14u) + ------ + ONE_COMPACTION + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;#];[2u;#];[3u;#];[4u;#];[11u;#];[12u;#];[13u;#];[14u;#]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(EmptyStringVariants) { + TString script = R"( + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5|1$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES (1u, JsonDocument('{"a" : "", "b" : "", "c" : ""}')) + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"\",\"b\":\"\",\"c\":\"\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(FilterVariants) { + TString script = R"( + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2|10$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$1024|0|1$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1", "b" : "b1", "c" : "c1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3", "d" : "d3"}')), (4u, JsonDocument('{"b" : "b4asdsasdaa", "a" : "a4"}')) + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "a2" ORDER BY Col1; + EXPECTED: [[2u;["{\"a\":\"a2\"}"]]] + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"a1\",\"b\":\"b1\",\"c\":\"c1\"}"]];[2u;["{\"a\":\"a2\"}"]];[3u;["{\"b\":\"b3\",\"d\":\"d3\"}"]];[4u;["{\"a\":\"a4\",\"b\":\"b4asdsasdaa\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(SimpleVariants) { + TString script = R"( + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2|10$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$1024|0|1$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4asdsasdaa", "a" : "a4"}')) + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"a1\"}"]];[2u;["{\"a\":\"a2\"}"]];[3u;["{\"b\":\"b3\"}"]];[4u;["{\"a\":\"a4\",\"b\":\"b4asdsasdaa\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(CompactionVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2|10$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')), + (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) + ------ + ONE_COMPACTION + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"a1\"}"]];[2u;["{\"a\":\"a2\"}"]];[3u;["{\"b\":\"b3\"}"]];[4u;["{\"a\":\"a4\",\"b\":\"b4\"}"]];[10u;#]; + [11u;["{\"a\":\"1a1\"}"]];[12u;["{\"a\":\"1a2\"}"]];[13u;["{\"b\":\"1b3\"}"]];[14u;["{\"a\":\"a4\",\"b\":\"1b4\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(SwitchAccessorCompactionVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2|10$$); + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}')) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')), + (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) + ------ + ONE_COMPACTION + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"a1\"}"]];[2u;["{\"a\":\"a2\"}"]];[3u;["{\"b\":\"b3\"}"]];[4u;["{\"a\":\"a4\",\"b\":\"b4\"}"]];[10u;#]; + [11u;["{\"a\":\"1a1\"}"]];[12u;["{\"a\":\"1a2\"}"]];[13u;["{\"b\":\"1b3\"}"]];[14u;["{\"a\":\"a4\",\"b\":\"1b4\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(DuplicationCompactionVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2|10$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "1a1"}')), (2u, JsonDocument('{"a" : "1a2"}')), + (3u, JsonDocument('{"b" : "1b3"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(2u) + ------ + ONE_COMPACTION + ------ + READ: SELECT * FROM `/Root/ColumnTable` ORDER BY Col1; + EXPECTED: [[1u;["{\"a\":\"1a1\"}"]];[2u;#];[3u;["{\"b\":\"1b3\"}"]];[4u;["{\"a\":\"a4\",\"b\":\"b4\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } + +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make index 0de5a59c6a..d548ff20ff 100644 --- a/ydb/core/kqp/ut/olap/ya.make +++ b/ydb/core/kqp/ut/olap/ya.make @@ -20,6 +20,7 @@ SRCS( decimal_ut.cpp delete_ut.cpp indexes_ut.cpp + json_ut.cpp kqp_olap_stats_ut.cpp locks_ut.cpp optimizer_ut.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.cpp b/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.cpp index b198114d0b..2d3a60a439 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.cpp @@ -26,4 +26,5 @@ void TGarbageCollectionActor::CheckFinished() { } } + } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h index bf8da708f3..a30280483c 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h @@ -10,6 +10,10 @@ private: std::shared_ptr<arrow::UInt32Array> RecordIdxArray; public: + ui32 GetRecordsCount() const { + return IdxArray->length(); + } + const arrow::UInt16Array& GetIdxArray() const { return *IdxArray; } @@ -136,6 +140,8 @@ public: void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext); std::vector<TColumnPortionResult> Execute(const TChunkMergeContext& context, TMergingContext& mergeContext) { + const auto& chunk = mergeContext.GetChunk(context.GetBatchIdx()); + AFL_VERIFY(context.GetRecordsCount() == chunk.GetIdxArray().length()); return DoExecute(context, mergeContext); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp index 6482ee3015..710654fd98 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp @@ -7,4 +7,13 @@ TString TColumnPortionResult::DebugString() const { return TStringBuilder() << "chunks=" << Chunks.size() << ";"; } +ui32 TColumnPortionResult::GetRecordsCount() const { + ui32 result = 0; + for (auto&& i : Chunks) { + AFL_VERIFY(i->GetRecordsCount()); + result += *i->GetRecordsCount(); + } + return result; +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h index 850e1f6eeb..6770797d40 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h @@ -9,6 +9,8 @@ protected: const ui32 ColumnId; public: + ui32 GetRecordsCount() const; + TColumnPortionResult(const ui32 columnId) : ColumnId(columnId) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.cpp index ddcb51e410..c44302a024 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.cpp @@ -159,10 +159,12 @@ void TSparsedMerger::TCursor::InitArrays(const ui32 position) { auto sparsedArray = static_pointer_cast<NArrow::NAccessor::TSparsedArray>(CurrentOwnedArray->GetArray()); SparsedCursor = std::make_shared<TSparsedChunkCursor>(sparsedArray, &*CurrentOwnedArray); PlainCursor = nullptr; - } else { + } else if (CurrentOwnedArray->GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::Array) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("event", "plain_merger"); PlainCursor = make_shared<TPlainChunkCursor>(CurrentOwnedArray->GetArray(), &*CurrentOwnedArray); SparsedCursor = nullptr; + } else { + AFL_VERIFY(false); } AFL_VERIFY(CurrentOwnedArray->GetAddress().GetGlobalStartPosition() <= position); FinishGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + CurrentOwnedArray->GetArray()->GetRecordsCount(); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.cpp new file mode 100644 index 0000000000..002647f17f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.cpp @@ -0,0 +1,5 @@ +#include "builder.h" + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.h b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.h new file mode 100644 index 0000000000..d9a44ca749 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/builder.h @@ -0,0 +1,192 @@ +#pragma once +#include "remap.h" + +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/settings.h> +#include <ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h> +#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +class TMergedBuilder { +private: + using TSparsedBuilder = NArrow::NAccessor::TSparsedArray::TSparsedBuilder<arrow::StringType>; + using TPlainBuilder = NArrow::NAccessor::TTrivialArray::TPlainBuilder<arrow::StringType>; + using TColumnsData = NArrow::NAccessor::NSubColumns::TColumnsData; + using TOthersData = NArrow::NAccessor::NSubColumns::TOthersData; + using TSettings = NArrow::NAccessor::NSubColumns::TSettings; + using TDictStats = NArrow::NAccessor::NSubColumns::TDictStats; + using TSubColumnsArray = NArrow::NAccessor::TSubColumnsArray; + + const TDictStats ResultColumnStats; + const TChunkMergeContext& Context; + std::shared_ptr<TOthersData::TBuilderWithStats> OthersBuilder; + ui32 TotalRecordsCount = 0; + ui32 RecordIndex = 0; + ui32 SumValuesSize = 0; + const TSettings Settings; + const TRemapColumns& Remapper; + + class TGeneralAccessorBuilder { + private: + std::variant<TSparsedBuilder, TPlainBuilder> Builder; + + public: + TGeneralAccessorBuilder(TSparsedBuilder&& builder) + : Builder(std::move(builder)) { + } + + TGeneralAccessorBuilder(TPlainBuilder&& builder) + : Builder(std::move(builder)) { + } + + void AddRecord(const ui32 recordIndex, const std::string_view value) { + struct TVisitor { + private: + const ui32 RecordIndex; + const std::string_view Value; + + public: + void operator()(TSparsedBuilder& builder) const { + builder.AddRecord(RecordIndex, Value); + } + void operator()(TPlainBuilder& builder) const { + builder.AddRecord(RecordIndex, Value); + } + TVisitor(const ui32 recordIndex, const std::string_view value) + : RecordIndex(recordIndex) + , Value(value) { + } + }; + std::visit(TVisitor(recordIndex, value), Builder); + } + std::shared_ptr<NArrow::NAccessor::IChunkedArray> Finish(const ui32 recordsCount) { + struct TVisitor { + private: + const ui32 RecordsCount; + + public: + std::shared_ptr<NArrow::NAccessor::IChunkedArray> operator()(TSparsedBuilder& builder) const { + return builder.Finish(RecordsCount); + } + std::shared_ptr<NArrow::NAccessor::IChunkedArray> operator()(TPlainBuilder& builder) const { + return builder.Finish(RecordsCount); + } + TVisitor(const ui32 recordsCount) + : RecordsCount(recordsCount) { + } + }; + return std::visit(TVisitor(recordsCount), Builder); + } + }; + + std::vector<TGeneralAccessorBuilder> ColumnBuilders; + std::vector<std::vector<std::shared_ptr<TSubColumnsArray>>> Results; + + void FlushData() { + AFL_VERIFY(RecordIndex); + auto portionOthersData = OthersBuilder->Finish(Remapper.BuildRemapInfo(OthersBuilder->GetStatsByKeyIndex(), Settings, RecordIndex)); + std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> arrays; + for (auto&& i : ColumnBuilders) { + arrays.emplace_back(i.Finish(RecordIndex)); + } + TColumnsData cData( + ResultColumnStats, std::make_shared<NArrow::TGeneralContainer>(ResultColumnStats.BuildColumnsSchema()->fields(), std::move(arrays))); + Results.back().emplace_back( + std::make_shared<TSubColumnsArray>(std::move(cData), std::move(portionOthersData), arrow::binary(), RecordIndex, Settings)); + Initialize(); + } + + void Initialize() { + ColumnBuilders.clear(); + for (ui32 i = 0; i < ResultColumnStats.GetColumnsCount(); ++i) { + switch (ResultColumnStats.GetAccessorType(i)) { + case NArrow::NAccessor::IChunkedArray::EType::Array: + ColumnBuilders.emplace_back(TPlainBuilder(0, 0)); + break; + case NArrow::NAccessor::IChunkedArray::EType::SparsedArray: + ColumnBuilders.emplace_back(TSparsedBuilder(nullptr, 0, 0)); + break; + case NArrow::NAccessor::IChunkedArray::EType::Undefined: + case NArrow::NAccessor::IChunkedArray::EType::SerializedChunkedArray: + case NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray: + case NArrow::NAccessor::IChunkedArray::EType::ChunkedArray: + AFL_VERIFY(false); + } + } + OthersBuilder = TOthersData::MakeMergedBuilder(); + RecordIndex = 0; + SumValuesSize = 0; + } + +public: + TMergedBuilder(const NArrow::NAccessor::NSubColumns::TDictStats& columnStats, const TChunkMergeContext& context, const TSettings& settings, + const TRemapColumns& remapper) + : ResultColumnStats(columnStats) + , Context(context) + , OthersBuilder(TOthersData::MakeMergedBuilder()) + , Settings(settings) + , Remapper(remapper) { + Results.emplace_back(); + Initialize(); + } + + class TPortionColumn: public TColumnPortionResult { + public: + void AddChunk(const std::shared_ptr<TSubColumnsArray>& cArray, const TColumnMergeContext& cmContext) { + AFL_VERIFY(cArray); + AFL_VERIFY(cArray->GetRecordsCount()); + auto accContext = cmContext.GetLoader()->BuildAccessorContext(cArray->GetRecordsCount()); + Chunks.emplace_back(std::make_shared<NChunks::TChunkPreparation>(cArray->SerializeToString(accContext), cArray, + TChunkAddress(cmContext.GetColumnId(), Chunks.size()), + cmContext.GetIndexInfo().GetColumnFeaturesVerified(cmContext.GetColumnId()))); + } + }; + + std::vector<TColumnPortionResult> Finish(const TColumnMergeContext& cmContext) { + if (RecordIndex) { + FlushData(); + } + std::vector<TColumnPortionResult> portions; + for (auto&& i : Results) { + TPortionColumn pColumn(cmContext.GetColumnId()); + AFL_VERIFY(i.size()); + for (auto&& p : i) { + pColumn.AddChunk(p, cmContext); + } + portions.emplace_back(std::move(pColumn)); + } + return std::move(portions); + } + + void StartRecord() { + } + void FinishRecord() { + Y_UNUSED(Context); + ++TotalRecordsCount; + ++RecordIndex; + if (TotalRecordsCount == Context.GetPortionRowsCountLimit()) { + TotalRecordsCount = 0; + FlushData(); + Results.emplace_back(); + } else if (SumValuesSize >= Settings.GetChunkMemoryLimit()) { + FlushData(); + } + } + void AddColumnKV(const ui32 commonKeyIndex, const std::string_view value) { + AFL_VERIFY(commonKeyIndex < ColumnBuilders.size()); + ColumnBuilders[commonKeyIndex].AddRecord(RecordIndex, value); + SumValuesSize += value.size(); + } + void AddOtherKV(const ui32 commonKeyIndex, const std::string_view value) { + OthersBuilder->Add(RecordIndex, commonKeyIndex, value); + SumValuesSize += value.size(); + } +}; + +} // namespace NKikimr::NOlap::NCompaction::NSubColumns diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.cpp new file mode 100644 index 0000000000..1d2ebb7822 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.cpp @@ -0,0 +1,5 @@ +#include "iterator.h" + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +} // namespace NKikimr::NOlap::NCompaction::NSubColumns diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.h b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.h new file mode 100644 index 0000000000..042d7a88f8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/iterator.h @@ -0,0 +1,80 @@ +#pragma once +#include "remap.h" + +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/settings.h> +#include <ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h> +#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +class TChunksIterator { +private: + using TReadIteratorOrderedKeys = NArrow::NAccessor::NSubColumns::TReadIteratorOrderedKeys; + using IChunkedArray = NArrow::NAccessor::IChunkedArray; + using TSubColumnsArray = NArrow::NAccessor::TSubColumnsArray; + const std::shared_ptr<IChunkedArray> OriginalArray; + std::optional<IChunkedArray::TFullChunkedArrayAddress> CurrentChunk; + ui32 CurrentChunkStartPosition = 0; + YDB_READONLY_DEF(std::shared_ptr<TSubColumnsArray>, CurrentSubColumnsArray); + std::shared_ptr<TReadIteratorOrderedKeys> DataIterator; + std::shared_ptr<TColumnLoader> Loader; + TRemapColumns& Remapper; + const ui32 SourceIdx; + + void InitArray(const ui32 position) { + if (OriginalArray) { + CurrentChunk = OriginalArray->GetArray(CurrentChunk, position, OriginalArray); + CurrentChunkStartPosition = CurrentChunk->GetAddress().GetGlobalStartPosition(); +// AFL_VERIFY(CurrentChunk->GetAddress().GetLocalIndex(position) == 0)("pos", position)( +// "local", CurrentChunk->GetAddress().GetLocalIndex(position)); + if (CurrentChunk->GetArray()->GetType() == IChunkedArray::EType::SubColumnsArray) { + CurrentSubColumnsArray = std::static_pointer_cast<TSubColumnsArray>(CurrentChunk->GetArray()); + } else { + CurrentSubColumnsArray = std::static_pointer_cast<TSubColumnsArray>( + Loader->GetAccessorConstructor() + ->Construct(CurrentChunk->GetArray(), Loader->BuildAccessorContext(CurrentChunk->GetArray()->GetRecordsCount())) + .DetachResult()); + } + Remapper.StartSourceChunk( + SourceIdx, CurrentSubColumnsArray->GetColumnsData().GetStats(), CurrentSubColumnsArray->GetOthersData().GetStats()); + DataIterator = CurrentSubColumnsArray->BuildOrderedIterator(); + } + } + +public: + TChunksIterator(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& originalArray, const std::shared_ptr<TColumnLoader>& loader, + TRemapColumns& remapper, const ui32 sourceIdx) + : OriginalArray(originalArray) + , Loader(loader) + , Remapper(remapper) + , SourceIdx(sourceIdx) { + } + + void Start() { + InitArray(0); + } + + template <class TStartRecordActor, class TKVActor, class TFinishRecordActor> + void ReadRecord(const ui32 recordIndex, const TStartRecordActor& startRecordActor, const TKVActor& kvActor, + const TFinishRecordActor& finishRecordActor) { + if (!OriginalArray) { + startRecordActor(recordIndex); + finishRecordActor(); + return; + } + AFL_VERIFY(CurrentChunkStartPosition <= recordIndex)("pred", CurrentChunkStartPosition)("record", recordIndex); + if (recordIndex - CurrentChunkStartPosition >= CurrentChunk->GetArray()->GetRecordsCount()) { + InitArray(recordIndex); + } + AFL_VERIFY(CurrentChunk->GetAddress().Contains(recordIndex)); + DataIterator->ReadRecord(recordIndex - CurrentChunkStartPosition, startRecordActor, kvActor, finishRecordActor); + } +}; + +} // namespace NKikimr::NOlap::NCompaction::NSubColumns diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.cpp new file mode 100644 index 0000000000..3be3c01d71 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.cpp @@ -0,0 +1,60 @@ +#include "builder.h" +#include "logic.h" + +#include <ydb/core/formats/arrow/accessor/sub_columns/constructor.h> + +namespace NKikimr::NOlap::NCompaction { + +const TSubColumnsMerger::TSettings& TSubColumnsMerger::GetSettings() const { + return Context.GetLoader()->GetAccessorConstructor().GetObjectPtrVerifiedAs<NArrow::NAccessor::NSubColumns::TConstructor>()->GetSettings(); +} + +void TSubColumnsMerger::DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& /*mergeContext*/) { + ui32 inputRecordsCount = 0; + for (auto&& i : input) { + OrderedIterators.emplace_back(NSubColumns::TChunksIterator(i, Context.GetLoader(), RemapKeyIndex, OrderedIterators.size())); + inputRecordsCount += i ? i->GetRecordsCount() : 0; + } + std::vector<const TDictStats*> stats; + for (auto&& i : OrderedIterators) { + if (i.GetCurrentSubColumnsArray()) { + stats.emplace_back(&i.GetCurrentSubColumnsArray()->GetColumnsData().GetStats()); + stats.emplace_back(&i.GetCurrentSubColumnsArray()->GetOthersData().GetStats()); + } + } + auto commonStats = TDictStats::Merge(stats, GetSettings(), inputRecordsCount); + auto splitted = commonStats.SplitByVolume(GetSettings(), inputRecordsCount); + ResultColumnStats = splitted.ExtractColumns(); + RemapKeyIndex.RegisterColumnStats(*ResultColumnStats); + for (auto&& i : OrderedIterators) { + i.Start(); + } +} + +std::vector<TColumnPortionResult> TSubColumnsMerger::DoExecute(const TChunkMergeContext& context, TMergingContext& mergeContext) { + AFL_VERIFY(ResultColumnStats); + auto& mergeChunkContext = mergeContext.GetChunk(context.GetBatchIdx()); + NSubColumns::TMergedBuilder builder(*ResultColumnStats, context, GetSettings(), RemapKeyIndex); + for (ui32 i = 0; i < context.GetRecordsCount(); ++i) { + const ui32 sourceIdx = mergeChunkContext.GetIdxArray().Value(i); + const ui32 recordIdx = mergeChunkContext.GetRecordIdxArray().Value(i); + const auto startRecord = [&](const ui32 /*sourceRecordIndex*/) { + builder.StartRecord(); + }; + const auto addKV = [&](const ui32 sourceKeyIndex, const std::string_view value, const bool isColumnKey) { + auto commonKeyInfo = RemapKeyIndex.RemapIndex(sourceIdx, sourceKeyIndex, isColumnKey); + if (commonKeyInfo.GetIsColumnKey()) { + builder.AddColumnKV(commonKeyInfo.GetCommonKeyIndex(), value); + } else { + builder.AddOtherKV(commonKeyInfo.GetCommonKeyIndex(), value); + } + }; + const auto finishRecord = [&]() { + builder.FinishRecord(); + }; + OrderedIterators[sourceIdx].ReadRecord(recordIdx, startRecord, addKV, finishRecord); + } + return builder.Finish(Context); +} + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h new file mode 100644 index 0000000000..d5d8478114 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h @@ -0,0 +1,40 @@ +#pragma once +#include "iterator.h" + +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/settings.h> +#include <ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h> +#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NOlap::NCompaction { + +class TSubColumnsMerger: public IColumnMerger { +private: + static inline auto Registrator = TFactory::TRegistrator<TSubColumnsMerger>(NArrow::NAccessor::TGlobalConst::SubColumnsDataAccessorName); + using TBase = IColumnMerger; + using TDictStats = NArrow::NAccessor::NSubColumns::TDictStats; + using TOthersData = NArrow::NAccessor::NSubColumns::TOthersData; + using TColumnsData = NArrow::NAccessor::NSubColumns::TColumnsData; + using TSubColumnsArray = NArrow::NAccessor::TSubColumnsArray; + using TSettings = NArrow::NAccessor::NSubColumns::TSettings; + using TRemapColumns = NKikimr::NOlap::NCompaction::NSubColumns::TRemapColumns; + std::vector<std::shared_ptr<TSubColumnsArray>> Sources; + std::optional<TDictStats> ResultColumnStats; + TRemapColumns RemapKeyIndex; + + const TSettings& GetSettings() const; + std::vector<NSubColumns::TChunksIterator> OrderedIterators; + + virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext) override; + virtual std::vector<TColumnPortionResult> DoExecute(const TChunkMergeContext& context, TMergingContext& mergeContext) override; + +public: + using TBase::TBase; +}; + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.cpp new file mode 100644 index 0000000000..2d06564f06 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.cpp @@ -0,0 +1,62 @@ +#include "remap.h" + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +TRemapColumns::TOthersData::TFinishContext TRemapColumns::BuildRemapInfo( + const std::vector<TDictStats::TRTStatsValue>& statsByKeyIndex, const TSettings& settings, const ui32 recordsCount) const { + TDictStats::TBuilder builder; + std::vector<ui32> remap; + remap.resize(statsByKeyIndex.size(), Max<ui32>()); + ui32 idx = 0; + for (auto&& i : TemporaryKeyIndex) { + if (i.second >= statsByKeyIndex.size()) { + continue; + } + if (!statsByKeyIndex[i.second].GetRecordsCount()) { + continue; + } + builder.Add(i.first, statsByKeyIndex[i.second].GetRecordsCount(), statsByKeyIndex[i.second].GetDataSize(), + settings.IsSparsed(statsByKeyIndex[i.second].GetRecordsCount(), recordsCount) ? NArrow::NAccessor::IChunkedArray::EType::SparsedArray + : NArrow::NAccessor::IChunkedArray::EType::Array); + remap[i.second] = idx++; + } + return TOthersData::TFinishContext(builder.Finish(), remap); +} + +void TRemapColumns::StartSourceChunk(const ui32 sourceIdx, const TDictStats& sourceColumnStats, const TDictStats& sourceOtherStats) { + if (RemapInfo.size() <= sourceIdx) { + RemapInfo.resize((sourceIdx + 1) * 2); + } + RemapInfo[sourceIdx].clear(); + auto& remapSourceInfo = RemapInfo[sourceIdx]; + remapSourceInfo.resize(2); + auto& remapSourceInfoColumns = remapSourceInfo[1]; + AFL_VERIFY(ResultColumnStats); + for (ui32 i = 0; i < sourceColumnStats.GetColumnsCount(); ++i) { + if (remapSourceInfoColumns.size() <= i) { + remapSourceInfoColumns.resize((i + 1) * 2); + } + AFL_VERIFY(!remapSourceInfoColumns[i]); + if (auto commonKeyIndex = ResultColumnStats->GetKeyIndexOptional(sourceColumnStats.GetColumnName(i))) { + remapSourceInfoColumns[i] = TRemapInfo(*commonKeyIndex, true); + } else { + commonKeyIndex = RegisterNewOtherIndex(sourceColumnStats.GetColumnName(i)); + remapSourceInfoColumns[i] = TRemapInfo(*commonKeyIndex, false); + } + } + auto& remapSourceInfoOthers = remapSourceInfo[0]; + for (ui32 i = 0; i < sourceOtherStats.GetColumnsCount(); ++i) { + if (remapSourceInfoOthers.size() <= i) { + remapSourceInfoOthers.resize((i + 1) * 2); + } + AFL_VERIFY(!remapSourceInfoOthers[i]); + if (auto commonKeyIndex = ResultColumnStats->GetKeyIndexOptional(sourceOtherStats.GetColumnName(i))) { + remapSourceInfoOthers[i] = TRemapInfo(*commonKeyIndex, true); + } else { + commonKeyIndex = RegisterNewOtherIndex(sourceOtherStats.GetColumnName(i)); + remapSourceInfoOthers[i] = TRemapInfo(*commonKeyIndex, false); + } + } +} + +} // namespace NKikimr::NOlap::NCompaction::NSubColumns diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.h b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.h new file mode 100644 index 0000000000..de17796f14 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/remap.h @@ -0,0 +1,87 @@ +#pragma once +#include <ydb/core/formats/arrow/accessor/plain/accessor.h> +#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> +#include <ydb/core/formats/arrow/accessor/sub_columns/settings.h> +#include <ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h> +#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h> + +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/accessor/common/const.h> + +namespace NKikimr::NOlap::NCompaction::NSubColumns { + +class TRemapColumns { +private: + using TDictStats = NArrow::NAccessor::NSubColumns::TDictStats; + using TOthersData = NArrow::NAccessor::NSubColumns::TOthersData; + using TSettings = NArrow::NAccessor::NSubColumns::TSettings; + + class TRemapInfo { + private: + YDB_READONLY(ui32, CommonKeyIndex, 0); + YDB_READONLY(bool, IsColumnKey, false); + + public: + TRemapInfo(const ui32 keyIndex, const bool isColumnKey) + : CommonKeyIndex(keyIndex) + , IsColumnKey(isColumnKey) { + } + }; + + class TSourceAddress { + private: + const ui32 SourceIndex; + const ui32 SourceKeyIndex; + const bool IsColumnKey; + + public: + TSourceAddress(const ui32 sourceIndex, const ui32 sourceKeyIndex, const bool isColumnKey) + : SourceIndex(sourceIndex) + , SourceKeyIndex(sourceKeyIndex) + , IsColumnKey(isColumnKey) { + } + + ui32 GetSourceIndex() const { + return SourceIndex; + } + + bool operator<(const TSourceAddress& item) const { + return std::tie(SourceIndex, SourceKeyIndex, IsColumnKey) < std::tie(item.SourceIndex, item.SourceKeyIndex, item.IsColumnKey); + } + }; + + const TDictStats* ResultColumnStats = nullptr; + std::vector<std::vector<std::vector<std::optional<TRemapInfo>>>> RemapInfo; + std::map<TString, ui32> TemporaryKeyIndex; + + ui32 RegisterNewOtherIndex(const TString& keyName) { + return TemporaryKeyIndex.emplace(keyName, TemporaryKeyIndex.size()).first->second; + } + + ui32 RegisterNewOtherIndex(const std::string_view keyName) { + return TemporaryKeyIndex.emplace(TString(keyName.data(), keyName.size()), TemporaryKeyIndex.size()).first->second; + } + +public: + TRemapColumns() = default; + + TOthersData::TFinishContext BuildRemapInfo(const std::vector<TDictStats::TRTStatsValue>& statsByKeyIndex, const TSettings& settings, const ui32 recordsCount) const; + + void RegisterColumnStats(const TDictStats& resultColumnStats) { + ResultColumnStats = &resultColumnStats; + } + + void StartSourceChunk(const ui32 sourceIdx, const TDictStats& sourceColumnStats, const TDictStats& sourceOtherStats); + + TRemapInfo RemapIndex(const ui32 sourceIdx, const ui32 sourceKeyIndex, const bool isColumnKey) const { + AFL_VERIFY(sourceIdx < RemapInfo.size()); + AFL_VERIFY(RemapInfo[sourceIdx].size() == 2); + AFL_VERIFY(sourceKeyIndex < RemapInfo[sourceIdx][isColumnKey ? 1 : 0].size()); + auto result = RemapInfo[sourceIdx][isColumnKey ? 1 : 0][sourceKeyIndex]; + AFL_VERIFY(result); + return *result; + } +}; + +} // namespace NKikimr::NOlap::NCompaction::NSubColumns diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/ya.make new file mode 100644 index 0000000000..f18a5c1990 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + GLOBAL logic.cpp + builder.cpp + remap.cpp + iterator.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/changes/compaction/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make index 5e76aa0d89..54edba8e51 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make @@ -10,6 +10,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/changes/compaction/common ydb/core/tx/columnshard/engines/changes/compaction/plain ydb/core/tx/columnshard/engines/changes/compaction/sparsed + ydb/core/tx/columnshard/engines/changes/compaction/sub_columns ) END() diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 0e2ff74680..a34f602028 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -18,8 +18,7 @@ namespace NKikimr::NOlap { namespace { -void FillDefaultColumn( - TPortionDataAccessor::TColumnAssemblingInfo& column, const TPortionInfo& portionInfo, const TSnapshot& defaultSnapshot) { +void FillDefaultColumn(TPortionDataAccessor::TColumnAssemblingInfo& column, const TPortionInfo& portionInfo, const TSnapshot& defaultSnapshot) { if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) { column.AddBlobInfo(0, portionInfo.GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo( @@ -116,8 +115,8 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent); } -void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, - const TVersionedIndex& index, const THashSet<ui32>& entityIds) const { +void TPortionDataAccessor::FillBlobRangesByStorage( + THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const { auto schema = PortionInfo->GetSchema(index); return FillBlobRangesByStorage(result, schema->GetIndexInfo(), entityIds); } @@ -215,8 +214,8 @@ void TPortionDataAccessor::FillBlobIdsByStorage(THashMap<TString, THashSet<TUnif return FillBlobIdsByStorage(result, schema->GetIndexInfo()); } -THashMap<TString, THashMap<NKikimr::NOlap::TChunkAddress, std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>>> -TPortionDataAccessor::RestoreEntityChunks(NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) const { +THashMap<TString, THashMap<TChunkAddress, std::shared_ptr<IPortionDataChunk>>> TPortionDataAccessor::RestoreEntityChunks( + NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) const { THashMap<TString, THashMap<TChunkAddress, std::shared_ptr<IPortionDataChunk>>> result; for (auto&& c : GetRecordsVerified()) { const TString& storageId = PortionInfo->GetColumnStorageId(c.GetColumnId(), indexInfo); @@ -385,7 +384,8 @@ std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers( return result; } -std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const { +std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages( + const ui64 memoryLimit, const std::set<ui32>& entityIds) const { class TEntityDelimiter { private: YDB_READONLY(ui32, IndexStart, 0); @@ -747,7 +747,7 @@ TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> TPortionDataAcces for (auto& blob : Blobs) { auto chunkedArray = blob.BuildRecordBatch(*Loader); if (chunkedArray.IsFail()) { - return chunkedArray; + return chunkedArray.AddMessageInfo("field: " + GetField()->name()); } builder.AddChunk(chunkedArray.DetachResult()); } @@ -799,7 +799,7 @@ TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> TPortionDataAcces } } else { AFL_VERIFY(ExpectedRowsCount); - return loader.ApplyConclusion(Data, *ExpectedRowsCount); + return loader.ApplyConclusion(Data, *ExpectedRowsCount).AddMessageInfo(::ToString(loader.GetAccessorConstructor()->GetType())); } } @@ -808,13 +808,13 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> TPortionDataAccessor::TP std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> columns; std::vector<std::shared_ptr<arrow::Field>> fields; for (auto&& i : Columns) { -// NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("column_id", i.GetColumnId()); + // NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("column_id", i.GetColumnId()); if (sequentialColumnIds.contains(i.GetColumnId())) { columns.emplace_back(i.AssembleForSeqAccess()); } else { auto conclusion = i.AssembleAccessor(); if (conclusion.IsFail()) { - return conclusion; + return TConclusionStatus::Fail(conclusion.GetErrorMessage() + ";" + i.GetName()); } columns.emplace_back(conclusion.DetachResult()); } diff --git a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp index 2f12570093..34e4578c8e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp @@ -1,24 +1,9 @@ #include "info.h" + #include <ydb/core/tx/columnshard/splitter/abstract/chunks.h> namespace NKikimr::NOlap { -NArrow::NTransformation::ITransformer::TPtr TSimpleColumnInfo::GetSaveTransformer() const { - NArrow::NTransformation::ITransformer::TPtr transformer; - if (DictionaryEncoding) { - transformer = DictionaryEncoding->BuildEncoder(); - } - return transformer; -} - -NArrow::NTransformation::ITransformer::TPtr TSimpleColumnInfo::GetLoadTransformer() const { - NArrow::NTransformation::ITransformer::TPtr transformer; - if (DictionaryEncoding) { - transformer = DictionaryEncoding->BuildDecoder(); - } - return transformer; -} - TConclusionStatus TSimpleColumnInfo::DeserializeFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo) { AFL_VERIFY(columnInfo.GetId() == ColumnId); if (columnInfo.HasSerializer()) { @@ -34,12 +19,7 @@ TConclusionStatus TSimpleColumnInfo::DeserializeFromProto(const NKikimrSchemeOp: } IsNullable = columnInfo.HasNotNull() ? !columnInfo.GetNotNull() : true; AFL_VERIFY(Serializer); - if (columnInfo.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding()); - Y_ABORT_UNLESS(settings.IsSuccess()); - DictionaryEncoding = *settings; - } - Loader = std::make_shared<TColumnLoader>(GetLoadTransformer(), Serializer, DataAccessorConstructor, ArrowField, DefaultValue.GetValue(), ColumnId); + Loader = std::make_shared<TColumnLoader>(Serializer, DataAccessorConstructor, ArrowField, DefaultValue.GetValue(), ColumnId); return TConclusionStatus::Success(); } @@ -53,38 +33,24 @@ TSimpleColumnInfo::TSimpleColumnInfo(const ui32 columnId, const std::shared_ptr< , NeedMinMax(needMinMax) , IsSorted(isSorted) , IsNullable(isNullable) - , DefaultValue(defaultValue) -{ + , DefaultValue(defaultValue) { ColumnName = ArrowField->name(); - Loader = std::make_shared<TColumnLoader>( - GetLoadTransformer(), Serializer, DataAccessorConstructor, ArrowField, DefaultValue.GetValue(), ColumnId); + Loader = std::make_shared<TColumnLoader>(Serializer, DataAccessorConstructor, ArrowField, DefaultValue.GetValue(), ColumnId); } -std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInfo::ActualizeColumnData(const std::vector<std::shared_ptr<IPortionDataChunk>>& source, const TSimpleColumnInfo& sourceColumnFeatures) const { +std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInfo::ActualizeColumnData( + const std::vector<std::shared_ptr<IPortionDataChunk>>& source, const TSimpleColumnInfo& sourceColumnFeatures) const { AFL_VERIFY(Loader); const auto checkNeedActualize = [&]() { if (!Serializer.IsEqualTo(sourceColumnFeatures.Serializer)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "serializer") - ("from", sourceColumnFeatures.Serializer.SerializeToProto().DebugString()) - ("to", Serializer.SerializeToProto().DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "serializer")( + "from", sourceColumnFeatures.Serializer.SerializeToProto().DebugString())("to", Serializer.SerializeToProto().DebugString()); return true; } if (!Loader->IsEqualTo(*sourceColumnFeatures.Loader)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "loader"); return true; } - if (!!DictionaryEncoding != !!sourceColumnFeatures.DictionaryEncoding) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary")( - "from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding); - return true; - } - if (!!DictionaryEncoding && !DictionaryEncoding->IsEqualTo(*sourceColumnFeatures.DictionaryEncoding)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary_encoding") - ("from", sourceColumnFeatures.DictionaryEncoding->SerializeToProto().DebugString()) - ("to", DictionaryEncoding->SerializeToProto().DebugString()) - ; - return true; - } return false; }; if (!checkNeedActualize()) { @@ -92,16 +58,18 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf } std::vector<std::shared_ptr<IPortionDataChunk>> result; for (auto&& s : source) { - std::shared_ptr<arrow::RecordBatch> data; + TString data; + const auto loadContext = Loader->BuildAccessorContext(s->GetRecordsCountVerified()); if (!DataAccessorConstructor.IsEqualTo(sourceColumnFeatures.DataAccessorConstructor)) { auto chunkedArray = sourceColumnFeatures.Loader->ApplyVerified(s->GetData(), s->GetRecordsCountVerified()); - data = DataAccessorConstructor.Construct(chunkedArray, Loader->BuildAccessorContext(s->GetRecordsCountVerified())); + data = DataAccessorConstructor.SerializeToString(DataAccessorConstructor->Construct(chunkedArray, loadContext).DetachResult(), loadContext); } else { - data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData()); + data = DataAccessorConstructor.SerializeToString( + DataAccessorConstructor.DeserializeFromString(s->GetData(), loadContext).DetachResult(), loadContext); } - result.emplace_back(s->CopyWithAnotherBlob(GetColumnSaver().Apply(data), *this)); + result.emplace_back(s->CopyWithAnotherBlob(std::move(data), *this)); } return result; } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/column/info.h b/ydb/core/tx/columnshard/engines/scheme/column/info.h index bc405480ce..a7eaea7933 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column/info.h +++ b/ydb/core/tx/columnshard/engines/scheme/column/info.h @@ -30,9 +30,7 @@ private: YDB_READONLY(bool, IsSorted, false); YDB_READONLY(bool, IsNullable, false); YDB_READONLY_DEF(TColumnDefaultScalarValue, DefaultValue); - std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding; std::shared_ptr<TColumnLoader> Loader; - NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const; public: TSimpleColumnInfo(const ui32 columnId, const std::shared_ptr<arrow::Field>& arrowField, @@ -40,9 +38,8 @@ public: const std::shared_ptr<arrow::Scalar>& defaultValue, const std::optional<ui32>& pkColumnIndex); TColumnSaver GetColumnSaver() const { - NArrow::NTransformation::ITransformer::TPtr transformer = GetSaveTransformer(); AFL_VERIFY(Serializer); - return TColumnSaver(transformer, Serializer); + return TColumnSaver(Serializer); } std::vector<std::shared_ptr<IPortionDataChunk>> ActualizeColumnData( @@ -51,12 +48,10 @@ public: TString DebugString() const { TStringBuilder sb; sb << "serializer=" << (Serializer ? Serializer->DebugString() : "NO") << ";"; - sb << "encoding=" << (DictionaryEncoding ? DictionaryEncoding->DebugString() : "NO") << ";"; sb << "loader=" << (Loader ? Loader->DebugString() : "NO") << ";"; return sb; } - NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const; TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo); const std::shared_ptr<TColumnLoader>& GetLoader() const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 9caaad2c0b..71b2cd73d3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -328,13 +328,15 @@ TConclusion<TWritePortionInfoWithBlobsResult> ISnapshotSchema::PrepareForWrite(c saver.AddSerializerWithBorder(100000000, NArrow::NSerialization::TNativeSerializer::GetFast()); const auto& columnFeatures = GetIndexInfo().GetColumnFeaturesVerified(columnId); auto accessor = std::make_shared<NArrow::NAccessor::TTrivialArray>(incomingBatch->column(incomingIndex)); - std::shared_ptr<arrow::RecordBatch> rbToWrite = + TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> arrToWrite = loader->GetAccessorConstructor()->Construct(accessor, loader->BuildAccessorContext(accessor->GetRecordsCount())); - std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrToWrite = - loader->GetAccessorConstructor()->Construct(rbToWrite, loader->BuildAccessorContext(accessor->GetRecordsCount())).DetachResult(); + if (arrToWrite.IsFail()) { + return arrToWrite; + } std::vector<std::shared_ptr<IPortionDataChunk>> columnChunks = { std::make_shared<NChunks::TChunkPreparation>( - saver.Apply(rbToWrite), arrToWrite, TChunkAddress(columnId, 0), columnFeatures) }; + loader->GetAccessorConstructor()->SerializeToString(*arrToWrite, loader->BuildAccessorContext(accessor->GetRecordsCount())), + *arrToWrite, TChunkAddress(columnId, 0), columnFeatures) }; AFL_VERIFY(chunks.emplace(columnId, std::move(columnChunks)).second); ++itIncoming; } diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/column.cpp b/ydb/core/tx/columnshard/engines/storage/chunks/column.cpp index 4a527f913f..6b44fb1845 100644 --- a/ydb/core/tx/columnshard/engines/storage/chunks/column.cpp +++ b/ydb/core/tx/columnshard/engines/storage/chunks/column.cpp @@ -4,9 +4,12 @@ namespace NKikimr::NOlap::NChunks { std::vector<std::shared_ptr<IPortionDataChunk>> TChunkPreparation::DoInternalSplitImpl( - const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/, const std::vector<ui64>& splitSizes) const { + const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/, const std::vector<ui64>& splitSizes) const { auto accessor = ColumnInfo.GetLoader()->ApplyVerified(Data, GetRecordsCountVerified()); - std::vector<NArrow::NAccessor::TChunkedArraySerialized> chunks = accessor->SplitBySizes(saver, Data, splitSizes); + const auto predSaver = [&](const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& arr) { + return ColumnInfo.GetLoader()->GetAccessorConstructor().SerializeToString(arr, ColumnInfo.GetLoader()->BuildAccessorContext(arr->GetRecordsCount())); + }; + std::vector<NArrow::NAccessor::TChunkedArraySerialized> chunks = accessor->SplitBySizes(predSaver, Data, splitSizes); std::vector<std::shared_ptr<IPortionDataChunk>> newChunks; for (auto&& i : chunks) { diff --git a/ydb/core/tx/columnshard/export/actor/export_actor.cpp b/ydb/core/tx/columnshard/export/actor/export_actor.cpp index 060a09e56e..b2db46ad79 100644 --- a/ydb/core/tx/columnshard/export/actor/export_actor.cpp +++ b/ydb/core/tx/columnshard/export/actor/export_actor.cpp @@ -8,7 +8,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) { auto data = ev->Get()->ArrowBatch; AFL_VERIFY(!!data || ev->Get()->Finished); if (data) { - CurrentData = NArrow::ToBatch(data, true); + CurrentData = NArrow::ToBatch(data); CurrentDataBlob = ExportSession->GetTask().GetSerializer()->SerializeFull(CurrentData); if (data) { auto controller = std::make_shared<TWriteController>(SelfId(), std::vector<TString>({CurrentDataBlob}), diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 8167424c8d..4cbe1f01ca 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -263,6 +263,11 @@ public: DisabledBackgrounds.emplace(id); } + bool IsBackgroundEnable(const EBackground id) { + TGuard<TMutex> g(Mutex); + return !DisabledBackgrounds.contains(id); + } + void EnableBackground(const EBackground id) { TGuard<TMutex> g(Mutex); DisabledBackgrounds.erase(id); diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 2d7a3abfed..59086f164f 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -172,7 +172,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta Context.GetActualSchema() ->NormalizeBatch(*Context.GetActualSchema(), std::make_shared<NArrow::TGeneralContainer>(OriginalBatch), columnIdsSet) .DetachResult(); - OriginalBatch = NArrow::ToBatch(normalized->BuildTableVerified(), true); + OriginalBatch = NArrow::ToBatch(normalized->BuildTableVerified()); } } auto batches = BuildSlices(); diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index 1e33b4bf97..c39be08b92 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -161,7 +161,8 @@ TGeneralSerializedSlice::TGeneralSerializedSlice(const THashMap<ui32, std::vecto if (!recordsCount) { recordsCount = entity.GetRecordsCount(); } else { - AFL_VERIFY(*recordsCount == entity.GetRecordsCount())("records_count", *recordsCount)("column", entity.GetRecordsCount()); + AFL_VERIFY(*recordsCount == entity.GetRecordsCount())("records_count", *recordsCount)("column", entity.GetRecordsCount())( + "chunks", chunks.size()); } } Size += entity.GetSize(); diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index fc3d44c286..d98c37fe6b 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -26,7 +26,7 @@ Y_UNIT_TEST_SUITE(Splitter) { protected: virtual NKikimr::NArrow::NAccessor::TColumnSaver DoGetColumnSaver(const ui32 columnId) const override { return NKikimr::NArrow::NAccessor::TColumnSaver( - nullptr, std::make_shared<NSerialization::TNativeSerializer>(arrow::ipc::IpcOptions::Defaults())); + std::make_shared<NSerialization::TNativeSerializer>(arrow::ipc::IpcOptions::Defaults())); } public: @@ -40,7 +40,7 @@ Y_UNIT_TEST_SUITE(Splitter) { } NKikimr::NArrow::NAccessor::TColumnLoader GetColumnLoader(const ui32 columnId) const { - return NKikimr::NArrow::NAccessor::TColumnLoader(nullptr, NSerialization::TSerializerContainer::GetDefaultSerializer(), + return NKikimr::NArrow::NAccessor::TColumnLoader(NSerialization::TSerializerContainer::GetDefaultSerializer(), NKikimr::NArrow::NAccessor::TConstructorContainer::GetDefaultConstructor(), GetField(columnId), nullptr, columnId); } @@ -139,9 +139,12 @@ Y_UNIT_TEST_SUITE(Splitter) { ui32 count = 0; for (auto&& c : e.second) { auto slice = arr->Slice(count + portionShift, c->GetRecordsCountVerified()); - auto readBatch = Schema->GetColumnLoader(e.first).ApplyRawVerified(c->GetData()); - AFL_VERIFY(slice->length() == readBatch->num_rows()); - Y_ABORT_UNLESS(readBatch->column(0)->RangeEquals(*slice, 0, readBatch->num_rows(), 0, arrow::EqualOptions::Defaults())); + auto readBatchArray = Schema->GetColumnLoader(e.first).ApplyVerified(c->GetData(), c->GetRecordsCountVerified()); + std::shared_ptr<arrow::ChunkedArray> chunkedArray = readBatchArray->GetChunkedArray(); + AFL_VERIFY(chunkedArray->num_chunks() == 1); + AFL_VERIFY(slice->length() == chunkedArray->length()); + Y_ABORT_UNLESS( + chunkedArray->chunk(0)->RangeEquals(*slice, 0, chunkedArray->length(), 0, arrow::EqualOptions::Defaults())); count += c->GetRecordsCountVerified(); AFL_VERIFY(entitiesByRecordsCount[count].emplace(e.first).second); AFL_VERIFY(entitiesByRecordsCount[count].size() <= (ui32)batch->num_columns()); diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.h b/ydb/core/tx/columnshard/test_helper/shard_reader.h index eb9f041062..4f31de43db 100644 --- a/ydb/core/tx/columnshard/test_helper/shard_reader.h +++ b/ydb/core/tx/columnshard/test_helper/shard_reader.h @@ -125,7 +125,7 @@ public: if (auto* evData = std::get<0>(event)) { auto b = evData->ArrowBatch; if (b) { - ResultBatches.push_back(NArrow::ToBatch(b, true)); + ResultBatches.push_back(NArrow::ToBatch(b)); NArrow::TStatusValidator::Validate(ResultBatches.back()->ValidateFull()); } else { AFL_VERIFY(evData->Finished); diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.cpp b/ydb/core/tx/columnshard/transactions/operators/schema.cpp index 03f310ad71..9f5a72a5b9 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/schema.cpp @@ -171,8 +171,7 @@ void TSchemaTransactionOperator::DoOnTabletInit(TColumnShard& owner) { case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: { for (auto&& i : SchemaTxBody.GetEnsureTables().GetTables()) { - AFL_VERIFY(!owner.TablesManager.HasTable(i.GetPathId())); - if (owner.TablesManager.HasTable(i.GetPathId(), true)) { + if (owner.TablesManager.HasTable(i.GetPathId(), true) && !owner.TablesManager.HasTable(i.GetPathId())) { WaitPathIdsToErase.emplace(i.GetPathId()); } } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index d759a5198c..08a33d4901 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2253,7 +2253,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { break; } UNIT_ASSERT(scan->ArrowBatch); - auto batchStats = NArrow::ToBatch(scan->ArrowBatch, true); + auto batchStats = NArrow::ToBatch(scan->ArrowBatch); for (ui32 i = 0; i < batchStats->num_rows(); ++i) { auto paths = batchStats->GetColumnByName("PathId"); auto kinds = batchStats->GetColumnByName("Kind"); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index d76d79b3bc..e862ecf4d5 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -89,7 +89,7 @@ public: for (auto&& key : portion2Key) { NKikimrTxColumnShard::TIndexColumnMeta metaProto; UNIT_ASSERT(metaProto.ParseFromArray(key.Metadata.data(), key.Metadata.size())); - metaProto.ClearNumRows(); +// metaProto.ClearNumRows(); metaProto.ClearRawBytes(); db.Table<Schema::IndexColumns>() diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index 81cde3ebe3..e35b7cda96 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -255,38 +255,38 @@ NSsa::TAssign TProgramBuilder::MakeConstant(const NSsa::TColumnInfo& name, const switch (constant.GetValueCase()) { case TId::kBool: - return TAssign(name, constant.GetBool()); + return TAssign(name, std::make_shared<arrow::BooleanScalar>(constant.GetBool())); case TId::kInt8: - return TAssign(name, i8(constant.GetInt8())); + return TAssign(name, std::make_shared<arrow::Int8Scalar>(i8(constant.GetInt8()))); case TId::kUint8: - return TAssign(name, ui8(constant.GetUint8())); + return TAssign(name, std::make_shared<arrow::UInt8Scalar>(ui8(constant.GetUint8()))); case TId::kInt16: - return TAssign(name, i16(constant.GetInt16())); + return TAssign(name, std::make_shared<arrow::Int16Scalar>(i16(constant.GetInt16()))); case TId::kUint16: - return TAssign(name, ui16(constant.GetUint16())); + return TAssign(name, std::make_shared<arrow::UInt16Scalar>(ui16(constant.GetUint16()))); case TId::kInt32: - return TAssign(name, constant.GetInt32()); + return TAssign(name, std::make_shared<arrow::Int32Scalar>(constant.GetInt32())); case TId::kUint32: - return TAssign(name, constant.GetUint32()); + return TAssign(name, std::make_shared<arrow::UInt32Scalar>(constant.GetUint32())); case TId::kInt64: - return TAssign(name, constant.GetInt64()); + return TAssign(name, std::make_shared<arrow::Int64Scalar>(constant.GetInt64())); case TId::kUint64: - return TAssign(name, constant.GetUint64()); + return TAssign(name, std::make_shared<arrow::UInt64Scalar>(constant.GetUint64())); case TId::kFloat: - return TAssign(name, constant.GetFloat()); + return TAssign(name, std::make_shared<arrow::FloatScalar>(constant.GetFloat())); case TId::kDouble: - return TAssign(name, constant.GetDouble()); + return TAssign(name, std::make_shared<arrow::DoubleScalar>(constant.GetDouble())); case TId::kTimestamp: return TAssign::MakeTimestamp(name, constant.GetTimestamp()); case TId::kBytes: { TString str = constant.GetBytes(); - return TAssign(name, std::string(str.data(), str.size()), true); + return TAssign(name, std::make_shared<arrow::BinaryScalar>(std::make_shared<arrow::Buffer>((const ui8*)str.data(), str.size()), arrow::binary())); } case TId::kText: { TString str = constant.GetText(); - return TAssign(name, std::string(str.data(), str.size()), false); + return TAssign(name, std::make_shared<arrow::StringScalar>(std::string(str.data(), str.size()))); } case TId::VALUE_NOT_SET: break; @@ -304,7 +304,7 @@ NSsa::TAggregateAssign TProgramBuilder::MakeAggregate(const NSsa::TColumnInfo& n Error = TStringBuilder() << "Unknown kernel for " << func.GetId() << ";kernel_idx=" << func.GetKernelIdx(); return TAggregateAssign(name); } - return TAggregateAssign(name, kernelFunction, {argument}); + return TAggregateAssign(name, kernelFunction, { argument }); } if (func.ArgumentsSize() == 1) { diff --git a/ydb/library/conclusion/result.h b/ydb/library/conclusion/result.h index 2839bb18cd..fde5328d09 100644 --- a/ydb/library/conclusion/result.h +++ b/ydb/library/conclusion/result.h @@ -6,6 +6,19 @@ namespace NKikimr { template <typename TResult> -using TConclusion = TConclusionImpl<TConclusionStatus, TResult>; +class TConclusion: public TConclusionImpl<TConclusionStatus, TResult> { +private: + using TBase = TConclusionImpl<TConclusionStatus, TResult>; +public: + using TBase::TBase; + + TConclusion<TResult> AddMessageInfo(const TString& info) const { + if (TBase::IsSuccess()) { + return *this; + } else { + return TConclusionStatus::Fail(TBase::GetErrorMessage() + "; " + info); + } + } +}; } // namespace NKikimr diff --git a/ydb/library/formats/arrow/accessor/abstract/accessor.cpp b/ydb/library/formats/arrow/accessor/abstract/accessor.cpp index c58ed0e283..73a2ab18c0 100644 --- a/ydb/library/formats/arrow/accessor/abstract/accessor.cpp +++ b/ydb/library/formats/arrow/accessor/abstract/accessor.cpp @@ -1,13 +1,12 @@ #include "accessor.h" +#include <ydb/library/actors/core/log.h> #include <ydb/library/formats/arrow/arrow_helpers.h> #include <ydb/library/formats/arrow/permutations.h> #include <ydb/library/formats/arrow/size_calcer.h> #include <ydb/library/formats/arrow/switch/compare.h> #include <ydb/library/formats/arrow/switch/switch_type.h> -#include <ydb/library/actors/core/log.h> - namespace NKikimr::NArrow::NAccessor { void IChunkedArray::TReader::AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const { @@ -43,9 +42,9 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con return std::make_shared<arrow::ChunkedArray>(chunks, DataType); } -NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk( - const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const { - AFL_VERIFY(position < GetRecordsCount())("pos", position)("records", GetRecordsCount())("current", chunkCurrent ? chunkCurrent->DebugString() : Default<TString>()); +IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const { + AFL_VERIFY(position < GetRecordsCount())("pos", position)("records", GetRecordsCount())( + "current", chunkCurrent ? chunkCurrent->DebugString() : Default<TString>()); std::optional<TCommonChunkAddress> address; if (IsDataOwner()) { @@ -61,7 +60,7 @@ NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetCh auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr); if (chunkCurrent) { AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())( - "chunked", chunkedArrayAddress.GetAddress().GetSize()); + "chunked", chunkedArrayAddress.GetAddress().GetSize()); } auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position)); auto fullAddress = std::move(chunkedArrayAddress.MutableAddress()); diff --git a/ydb/library/formats/arrow/accessor/abstract/accessor.h b/ydb/library/formats/arrow/accessor/abstract/accessor.h index 6507201cb8..fd3aba9636 100644 --- a/ydb/library/formats/arrow/accessor/abstract/accessor.h +++ b/ydb/library/formats/arrow/accessor/abstract/accessor.h @@ -1,16 +1,23 @@ #pragma once + #include <ydb/library/accessor/accessor.h> #include <ydb/library/accessor/validator.h> +#include <ydb/library/formats/arrow/splitter/similar_packer.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> +#include <library/cpp/json/writer/json_value.h> #include <util/string/builder.h> +namespace NKikimr::NArrow::NSerialization { +class ISerializer; +} + namespace NKikimr::NArrow::NAccessor { -class TColumnSaver; +class TColumnLoader; class IChunkedArray; class TChunkedArraySerialized { @@ -24,12 +31,13 @@ public: class IChunkedArray { public: - enum class EType { - Undefined, + enum class EType : ui8 { + Undefined = 0, Array, ChunkedArray, SerializedChunkedArray, - SparsedArray + SparsedArray, + SubColumnsArray }; class TCommonChunkAddress { @@ -105,8 +113,7 @@ public: TString DebugString() const { TStringBuilder sb; - sb << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition - << ";addresses_count=" << Addresses.size() << ";"; + sb << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition << ";addresses_count=" << Addresses.size() << ";"; for (auto&& i : Addresses) { sb << "addresses=" << i.DebugString() << ";"; } @@ -122,8 +129,7 @@ public: public: TFullChunkedArrayAddress(const std::shared_ptr<IChunkedArray>& arr, TAddressChain&& address) : Array(arr) - , Address(std::move(address)) - { + , Address(std::move(address)) { AFL_VERIFY(Address.GetSize()); AFL_VERIFY(Array); AFL_VERIFY(Array->GetRecordsCount()); @@ -167,8 +173,7 @@ public: TFullDataAddress(const std::shared_ptr<arrow::Array>& arr, TAddressChain&& address) : Array(arr) - , Address(std::move(address)) - { + , Address(std::move(address)) { AFL_VERIFY(Array); AFL_VERIFY(Address.GetSize()); } @@ -186,8 +191,7 @@ public: TLocalDataAddress(const std::shared_ptr<arrow::Array>& arr, const ui32 start, const ui32 chunkIdx) : Array(arr) - , Address(start, start + TValidator::CheckNotNull(arr)->length(), chunkIdx) - { + , Address(start, start + TValidator::CheckNotNull(arr)->length(), chunkIdx) { } TLocalDataAddress(const std::shared_ptr<arrow::Array>& arr, const TCommonChunkAddress& address) @@ -213,8 +217,7 @@ public: TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position) : Array(arr) - , Position(position) - { + , Position(position) { AFL_VERIFY(!!Array); AFL_VERIFY(position < (ui32)Array->length()); } @@ -226,23 +229,34 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, DataType); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(EType, Type, EType::Undefined); + virtual NJson::TJsonValue DoDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("data", GetChunkedArray()->ToString()); + return result; + } virtual std::optional<ui64> DoGetRawSize() const = 0; virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const = 0; - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const = 0; + virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( + const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const { + AFL_VERIFY(false); + return TLocalChunkedArrayAddress(nullptr, 0, 0); + } virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const = 0; + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const = 0; + virtual ui32 DoGetNullsCount() const = 0; + virtual ui32 DoGetValueRawBytes() const = 0; protected: - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const = 0; + std::shared_ptr<arrow::Schema> GetArraySchema() const { + const arrow::FieldVector fields = { std::make_shared<arrow::Field>("val", GetDataType()) }; + return std::make_shared<arrow::Schema>(fields); + } + TLocalChunkedArrayAddress GetLocalChunkedArray(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const { return DoGetLocalChunkedArray(chunkCurrent, position); } - TLocalDataAddress GetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const { - return DoGetLocalData(chunkCurrent, position); - } virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const = 0; - virtual std::vector<TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) = 0; template <class TCurrentPosition, class TChunkAccessor> void SelectChunk(const std::optional<TCurrentPosition>& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const { @@ -251,8 +265,7 @@ protected: ui64 idx = 0; if (chunkCurrent) { if (position < chunkCurrent->GetFinishPosition()) { - return accessor.OnArray( - chunkCurrent->GetChunkIndex(), chunkCurrent->GetStartPosition()); + return accessor.OnArray(chunkCurrent->GetChunkIndex(), chunkCurrent->GetStartPosition()); } else if (position == chunkCurrent->GetFinishPosition() && chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount()) { return accessor.OnArray(chunkCurrent->GetChunkIndex() + 1, position); } @@ -295,11 +308,32 @@ protected: chunkCurrentInfo << chunkCurrent->DebugString(); } AFL_VERIFY(recordsCountChunks == GetRecordsCount())("pos", position)("count", GetRecordsCount())("chunks_map", sb)( - "chunk_current", chunkCurrentInfo); + "chunk_current", chunkCurrentInfo)("chunks_count", recordsCountChunks); AFL_VERIFY(false)("pos", position)("count", GetRecordsCount())("chunks_map", sb)("chunk_current", chunkCurrentInfo); } public: + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", ::ToString(Type)); + result.InsertValue("records_count", RecordsCount); + result.InsertValue("internal", DoDebugJson()); + return result; + } + + ui32 GetNullsCount() const { + return DoGetNullsCount(); + } + + ui32 GetValueRawBytes() const { + return DoGetValueRawBytes(); + } + + TLocalDataAddress GetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const { + AFL_VERIFY(position < GetRecordsCount())("position", position)("records_count", GetRecordsCount()); + return DoGetLocalData(chunkCurrent, position); + } + class TReader { private: std::shared_ptr<IChunkedArray> ChunkedArray; @@ -328,9 +362,18 @@ public: return DoGetScalar(index); } + template <class TSerializer> std::vector<TChunkedArraySerialized> SplitBySizes( - const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) { - return DoSplitBySizes(saver, fullSerializedData, splitSizes); + const TSerializer& serialize, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) { + const std::vector<ui32> recordsCount = NSplitter::TSimilarPacker::SizesToRecordsCount(GetRecordsCount(), fullSerializedData, splitSizes); + std::vector<TChunkedArraySerialized> result; + ui32 currentStartIndex = 0; + for (auto&& i : recordsCount) { + std::shared_ptr<IChunkedArray> slice = ISlice(currentStartIndex, i); + result.emplace_back(slice, serialize(slice)); + currentStartIndex += i; + } + return result; } std::shared_ptr<arrow::Scalar> GetMaxScalar() const { @@ -348,17 +391,29 @@ public: return *result; } - std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const { - return DoGetChunkedArray(); + virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const { + std::vector<std::shared_ptr<arrow::Array>> chunks; + std::optional<TFullDataAddress> address; + for (ui32 position = 0; position < GetRecordsCount();) { + address = GetChunk(address, position); + chunks.emplace_back(address->GetArray()); + position += address->GetArray()->length(); + } + return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType()); } virtual ~IChunkedArray() = default; std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const; + std::shared_ptr<IChunkedArray> ISlice(const ui32 offset, const ui32 count) const { + AFL_VERIFY(offset + count <= GetRecordsCount())("offset", offset)("count", count)("records", GetRecordsCount()); + return DoISlice(offset, count); + } bool IsDataOwner() const { switch (Type) { case EType::SparsedArray: case EType::ChunkedArray: + case EType::SubColumnsArray: case EType::Array: return true; case EType::Undefined: @@ -388,7 +443,7 @@ public: if (chunkCurrent) { return GetArray(chunkCurrent->GetAddress(), position, selfPtr); } else { - return GetArray(std::optional<TAddressChain>(), position, selfPtr); + return GetArraySlow(position, selfPtr); } } diff --git a/ydb/library/formats/arrow/accessor/abstract/ya.make b/ydb/library/formats/arrow/accessor/abstract/ya.make index 03f6c83afa..2e3d8103a4 100644 --- a/ydb/library/formats/arrow/accessor/abstract/ya.make +++ b/ydb/library/formats/arrow/accessor/abstract/ya.make @@ -12,4 +12,6 @@ SRCS( accessor.cpp ) +GENERATE_ENUM_SERIALIZATION(accessor.h) + END() diff --git a/ydb/library/formats/arrow/accessor/common/chunk_data.cpp b/ydb/library/formats/arrow/accessor/common/chunk_data.cpp index da03037ef3..2356af1bd4 100644 --- a/ydb/library/formats/arrow/accessor/common/chunk_data.cpp +++ b/ydb/library/formats/arrow/accessor/common/chunk_data.cpp @@ -1,5 +1,18 @@ #include "chunk_data.h" +#include <ydb/library/actors/core/log.h> + namespace NKikimr::NArrow::NAccessor { +TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue, + const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer) + : RecordsCount(recordsCount) + , DefaultValue(defaultValue) + , ColumnType(columnType) + , DefaultSerializer(defaultSerializer) { + AFL_VERIFY(ColumnType); + AFL_VERIFY(RecordsCount); + AFL_VERIFY(!!DefaultSerializer); } + +} // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/library/formats/arrow/accessor/common/chunk_data.h b/ydb/library/formats/arrow/accessor/common/chunk_data.h index d10d27abb8..ccbb318faa 100644 --- a/ydb/library/formats/arrow/accessor/common/chunk_data.h +++ b/ydb/library/formats/arrow/accessor/common/chunk_data.h @@ -1,7 +1,12 @@ #pragma once #include <ydb/library/accessor/accessor.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + +namespace NKikimr::NArrow::NSerialization { +class ISerializer; +} namespace NKikimr::NArrow::NAccessor { @@ -10,14 +15,11 @@ private: YDB_READONLY(ui32, RecordsCount, 0); YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue); YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, ColumnType); + YDB_READONLY_DEF(std::shared_ptr<NSerialization::ISerializer>, DefaultSerializer); public: - TChunkConstructionData( - const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& columnType) - : RecordsCount(recordsCount) - , DefaultValue(defaultValue) - , ColumnType(columnType) { - } + TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue, + const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer); }; } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/library/formats/arrow/accessor/common/const.h b/ydb/library/formats/arrow/accessor/common/const.h index 1923328544..3de41402c0 100644 --- a/ydb/library/formats/arrow/accessor/common/const.h +++ b/ydb/library/formats/arrow/accessor/common/const.h @@ -6,6 +6,7 @@ namespace NKikimr::NArrow::NAccessor { class TGlobalConst { public: static const inline TString SparsedDataAccessorName = "SPARSED"; + static const inline TString SubColumnsDataAccessorName = "SUB_COLUMNS"; static const inline TString PlainDataAccessorName = "PLAIN"; }; diff --git a/ydb/library/formats/arrow/accessor/common/ya.make b/ydb/library/formats/arrow/accessor/common/ya.make index f7ecec94f2..9f2c4e95b8 100644 --- a/ydb/library/formats/arrow/accessor/common/ya.make +++ b/ydb/library/formats/arrow/accessor/common/ya.make @@ -2,6 +2,7 @@ LIBRARY(library-formats-arrow-accessor-common) PEERDIR( contrib/libs/apache/arrow + ydb/library/actors/core ) SRCS( diff --git a/ydb/library/formats/arrow/accessor/composite/accessor.cpp b/ydb/library/formats/arrow/accessor/composite/accessor.cpp index 5660aaccc9..85e4396b99 100644 --- a/ydb/library/formats/arrow/accessor/composite/accessor.cpp +++ b/ydb/library/formats/arrow/accessor/composite/accessor.cpp @@ -25,8 +25,37 @@ public: } } }; + } // namespace +std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoISlice(const ui32 offset, const ui32 count) const { + ui32 slicedRecordsCount = 0; + ui32 currentIndex = offset; + std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress; + std::vector<std::shared_ptr<IChunkedArray>> chunks; + while (slicedRecordsCount < count && currentIndex < GetRecordsCount()) { + arrAddress = GetArray(arrAddress, currentIndex, nullptr); + const ui32 localIndex = arrAddress->GetAddress().GetLocalIndex(currentIndex); + const ui32 localCount = (arrAddress->GetArray()->GetRecordsCount() + slicedRecordsCount < count) + ? arrAddress->GetArray()->GetRecordsCount() + : (count - slicedRecordsCount); + + if (localIndex == 0 && localCount == arrAddress->GetArray()->GetRecordsCount()) { + chunks.emplace_back(arrAddress->GetArray()); + } else { + chunks.emplace_back(arrAddress->GetArray()->ISlice(localIndex, localCount)); + } + slicedRecordsCount += localCount; + currentIndex += localCount; + } + AFL_VERIFY(slicedRecordsCount == count)("sliced", slicedRecordsCount)("count", count); + if (chunks.size() == 1) { + return chunks.front(); + } else { + return std::make_shared<TCompositeChunkedArray>(std::move(chunks), count, GetDataType()); + } +} + IChunkedArray::TLocalDataAddress TCompositeChunkedArray::DoGetLocalData( const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const { AFL_VERIFY(false); @@ -42,16 +71,4 @@ IChunkedArray::TLocalChunkedArrayAddress TCompositeChunkedArray::DoGetLocalChunk return *result; } -std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::DoGetChunkedArray() const { - std::vector<std::shared_ptr<arrow::Array>> chunks; - for (auto&& i : Chunks) { - auto arr = i->GetChunkedArray(); - AFL_VERIFY(arr->num_chunks()); - for (auto&& chunk : arr->chunks()) { - chunks.emplace_back(chunk); - } - } - return std::make_shared<arrow::ChunkedArray>(chunks); -} - } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/library/formats/arrow/accessor/composite/accessor.h b/ydb/library/formats/arrow/accessor/composite/accessor.h index a86c36025d..62fad4d27e 100644 --- a/ydb/library/formats/arrow/accessor/composite/accessor.h +++ b/ydb/library/formats/arrow/accessor/composite/accessor.h @@ -1,26 +1,38 @@ #pragma once -#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> #include <ydb/library/accessor/accessor.h> +#include <ydb/library/formats/arrow/accessor/abstract/accessor.h> namespace NKikimr::NArrow::NAccessor { -class TCompositeChunkedArray: public NArrow::NAccessor::IChunkedArray { +class ICompositeChunkedArray: public NArrow::NAccessor::IChunkedArray { private: using TBase = NArrow::NAccessor::IChunkedArray; + virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override final; + +public: + using TBase::TBase; +}; +class TCompositeChunkedArray: public ICompositeChunkedArray { private: - std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Chunks; + using TBase = ICompositeChunkedArray; -protected: - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( - const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; +private: + std::vector<std::shared_ptr<IChunkedArray>> Chunks; - virtual std::vector<NArrow::NAccessor::TChunkedArraySerialized> DoSplitBySizes( - const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override { +protected: + virtual ui32 DoGetNullsCount() const override { AFL_VERIFY(false); - return {}; + return 0; + } + virtual ui32 DoGetValueRawBytes() const override { + AFL_VERIFY(false); + return 0; } + virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( + const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; + virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override { AFL_VERIFY(false)("problem", "cannot use method"); return nullptr; @@ -33,21 +45,20 @@ protected: return nullptr; } virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override; - virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override; +public: TCompositeChunkedArray(std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>&& chunks, const ui32 recordsCount, const std::shared_ptr<arrow::DataType>& type) : TBase(recordsCount, NArrow::NAccessor::IChunkedArray::EType::SerializedChunkedArray, type) , Chunks(std::move(chunks)) { } -public: class TBuilder { private: ui32 RecordsCount = 0; std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Chunks; const std::shared_ptr<arrow::DataType> Type; - + bool Finished = false; public: TBuilder(const std::shared_ptr<arrow::DataType>& type) : Type(type) { @@ -55,12 +66,15 @@ public: } void AddChunk(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& arr) { + AFL_VERIFY(!Finished); AFL_VERIFY(arr->GetDataType()->id() == Type->id())("incoming", arr->GetDataType()->ToString())("main", Type->ToString()); Chunks.emplace_back(arr); RecordsCount += arr->GetRecordsCount(); } std::shared_ptr<TCompositeChunkedArray> Finish() { + AFL_VERIFY(!Finished); + Finished = true; return std::shared_ptr<TCompositeChunkedArray>(new TCompositeChunkedArray(std::move(Chunks), RecordsCount, Type)); } }; diff --git a/ydb/library/formats/arrow/arrow_helpers.cpp b/ydb/library/formats/arrow/arrow_helpers.cpp index 78fa0c3e71..c84df8da12 100644 --- a/ydb/library/formats/arrow/arrow_helpers.cpp +++ b/ydb/library/formats/arrow/arrow_helpers.cpp @@ -46,29 +46,22 @@ std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared return nullptr; } auto table = TStatusValidator::GetValid(arrow::Table::FromRecordBatches(batches)); - return table ? ToBatch(table, true) : nullptr; + return table ? ToBatch(table) : nullptr; } -std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt, const bool combine) { +std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt) { if (!tableExt) { return nullptr; } - std::shared_ptr<arrow::Table> table; - if (combine) { - auto res = tableExt->CombineChunks(); - Y_ABORT_UNLESS(res.ok()); - table = *res; - } else { - table = tableExt; - } + std::shared_ptr<arrow::Table> res = TStatusValidator::GetValid(tableExt->CombineChunks()); std::vector<std::shared_ptr<arrow::Array>> columns; - columns.reserve(table->num_columns()); - for (auto& col : table->columns()) { - AFL_VERIFY(col->num_chunks() == 1)("size", col->num_chunks())("size_bytes", GetTableDataSize(tableExt)) - ("schema", tableExt->schema()->ToString())("size_new", GetTableDataSize(table)); + columns.reserve(tableExt->num_columns()); + for (auto& col : res->columns()) { + AFL_VERIFY(col->num_chunks() == 1)("size", col->num_chunks())("size_bytes", GetTableDataSize(res))("schema", res->schema()->ToString())( + "size_new", GetTableDataSize(res)); columns.push_back(col->chunk(0)); } - return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns); + return arrow::RecordBatch::Make(res->schema(), res->num_rows(), columns); } // Check if the permutation doesn't reorder anything @@ -205,18 +198,28 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared return builders; } -std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field) { +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field, const ui32 reserveItems, const ui32 reserveSize) { AFL_VERIFY(field); - return MakeBuilder(field->type()); + return MakeBuilder(field->type(), reserveItems, reserveSize); } -std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::DataType>& type) { +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::DataType>& type, const ui32 reserveItems, const ui32 reserveSize) { AFL_VERIFY(type); std::unique_ptr<arrow::ArrayBuilder> builder; TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder)); + if (reserveSize) { + ReserveData(*builder, reserveSize); + } + TStatusValidator::Validate(builder->Reserve(reserveItems)); return std::move(builder); } +std::shared_ptr<arrow::Array> FinishBuilder(std::unique_ptr<arrow::ArrayBuilder>&& builder) { + std::shared_ptr<arrow::Array> array; + TStatusValidator::Validate(builder->Finish(&array)); + return array; +} + std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders) { std::vector<std::shared_ptr<arrow::Array>> out; for (auto& builder : builders) { diff --git a/ydb/library/formats/arrow/arrow_helpers.h b/ydb/library/formats/arrow/arrow_helpers.h index de9f23da43..de50755802 100644 --- a/ydb/library/formats/arrow/arrow_helpers.h +++ b/ydb/library/formats/arrow/arrow_helpers.h @@ -43,19 +43,22 @@ TString SerializeSchema(const arrow::Schema& schema); std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount = 0); std::shared_ptr<arrow::Table> ToTable(const std::shared_ptr<arrow::RecordBatch>& batch); -std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine); +std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable); std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb); std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& sharding, ui32 numShards); std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::vector<ui32>>& shardRows, const ui32 numShards); THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, const THashMap<ui64, std::vector<ui32>>& shardRows); -std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field); -std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::DataType>& type); +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder( + const std::shared_ptr<arrow::Field>& field, const ui32 reserveItems = 0, const ui32 reserveSize = 0); +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder( + const std::shared_ptr<arrow::DataType>& type, const ui32 reserveItems = 0, const ui32 reserveSize = 0); std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema, size_t reserve = 0, const std::map<std::string, ui64>& sizeByColumn = {}); std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders); +std::shared_ptr<arrow::Array> FinishBuilder(std::unique_ptr<arrow::ArrayBuilder>&& builders); std::shared_ptr<arrow::UInt64Array> MakeUI64Array(const ui64 value, const i64 size); std::shared_ptr<arrow::StringArray> MakeStringArray(const TString& value, const i64 size); diff --git a/ydb/library/formats/arrow/protos/accessor.proto b/ydb/library/formats/arrow/protos/accessor.proto index 015ea0b7cf..b9edfe1be1 100644 --- a/ydb/library/formats/arrow/protos/accessor.proto +++ b/ydb/library/formats/arrow/protos/accessor.proto @@ -9,9 +9,20 @@ message TRequestedConstructor { message TSparsed { } + message TSubColumns { + message TSettings { + optional uint32 SparsedDetectorKff = 1 [default = 20]; + optional uint32 ColumnsLimit = 2 [default = 1024]; + optional uint32 ChunkMemoryLimit = 3 [default = 50000000]; + optional double OthersAllowedFraction = 4 [default = 0.05]; + } + optional TSettings Settings = 1; + } + oneof Implementation { TPlain Plain = 10; TSparsed Sparsed = 11; + TSubColumns SubColumns = 12; } } @@ -23,8 +34,30 @@ message TConstructor { message TSparsed { } + + message TSubColumns { + message TSettings { + optional uint32 SparsedDetectorKff = 1 [default = 20]; + optional uint32 ColumnsLimit = 2 [default = 1024]; + optional uint32 ChunkMemoryLimit = 3 [default = 50000000]; + optional double OthersAllowedFraction = 4 [default = 0.05]; + } + optional TSettings Settings = 1; + } oneof Implementation { TPlain Plain = 10; TSparsed Sparsed = 11; + TSubColumns SubColumns = 12; + } +} + +message TSubColumnsAccessor { + optional uint32 ColumnStatsSize = 1; + optional uint32 OtherStatsSize = 2; + message TColumn { + optional uint32 Size = 1; } + repeated TColumn KeyColumns = 3; + repeated TColumn OtherColumns = 4; + optional uint32 OtherRecordsCount = 5; }
\ No newline at end of file diff --git a/ydb/library/formats/arrow/splitter/similar_packer.cpp b/ydb/library/formats/arrow/splitter/similar_packer.cpp index 94395e18c3..e5e32b19b6 100644 --- a/ydb/library/formats/arrow/splitter/similar_packer.cpp +++ b/ydb/library/formats/arrow/splitter/similar_packer.cpp @@ -1,5 +1,51 @@ #include "similar_packer.h" +#include <ydb/library/actors/core/log.h> + +#include <util/string/join.h> + namespace NKikimr::NArrow::NSplitter { +std::vector<ui32> TSimilarPacker::SizesToRecordsCount( + const ui32 serializedRecordsCount, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) { + auto splitPartSizesLocal = splitPartSizesExt; + Y_ABORT_UNLESS(serializedRecordsCount); + { + ui32 sumSizes = 0; + for (auto&& i : splitPartSizesExt) { + sumSizes += i; + } + Y_ABORT_UNLESS(sumSizes <= dataSerialization.size()); + + if (sumSizes < dataSerialization.size()) { + splitPartSizesLocal.emplace_back(dataSerialization.size() - sumSizes); + } + } + Y_ABORT_UNLESS(dataSerialization.size() > splitPartSizesLocal.size()); + std::vector<ui32> recordsCount; + i64 remainedRecordsCount = serializedRecordsCount; + const double rowsPerByte = 1.0 * serializedRecordsCount / dataSerialization.size(); + i32 remainedParts = splitPartSizesLocal.size(); + for (ui32 idx = 0; idx < splitPartSizesLocal.size(); ++idx) { + AFL_VERIFY(remainedRecordsCount >= remainedParts)("remained_records_count", remainedRecordsCount)("remained_parts", remainedParts)( + "idx", idx)("size", splitPartSizesLocal.size())("sizes", JoinSeq(",", splitPartSizesLocal))("data_size", dataSerialization.size()); + --remainedParts; + i64 expectedRecordsCount = rowsPerByte * splitPartSizesLocal[idx]; + if (expectedRecordsCount < 1) { + expectedRecordsCount = 1; + } else if (remainedRecordsCount < expectedRecordsCount + remainedParts) { + expectedRecordsCount = remainedRecordsCount - remainedParts; + } + if (idx + 1 == splitPartSizesLocal.size()) { + expectedRecordsCount = remainedRecordsCount; + } + Y_ABORT_UNLESS(expectedRecordsCount); + recordsCount.emplace_back(expectedRecordsCount); + remainedRecordsCount -= expectedRecordsCount; + Y_ABORT_UNLESS(remainedRecordsCount >= 0); + } + Y_ABORT_UNLESS(remainedRecordsCount == 0); + return recordsCount; } + +} // namespace NKikimr::NArrow::NSplitter diff --git a/ydb/library/formats/arrow/splitter/similar_packer.h b/ydb/library/formats/arrow/splitter/similar_packer.h index 1fdfdf3e7a..3537789c94 100644 --- a/ydb/library/formats/arrow/splitter/similar_packer.h +++ b/ydb/library/formats/arrow/splitter/similar_packer.h @@ -1,4 +1,5 @@ #pragma once +#include <util/generic/string.h> #include <util/system/types.h> #include <vector> @@ -10,11 +11,11 @@ class TArrayView { private: typename TContainer::iterator Begin; typename TContainer::iterator End; + public: TArrayView(typename TContainer::iterator itBegin, typename TContainer::iterator itEnd) : Begin(itBegin) , End(itEnd) { - } typename TContainer::iterator begin() { @@ -44,11 +45,10 @@ using TVectorView = TArrayView<std::vector<TObject>>; class TSimilarPacker { private: const ui64 BottomLimitNecessary = 0; + public: TSimilarPacker(const ui64 bottomLimitNecessary) - : BottomLimitNecessary(bottomLimitNecessary) - { - + : BottomLimitNecessary(bottomLimitNecessary) { } template <class TObject> @@ -58,7 +58,7 @@ public: fullSize += i.GetSize(); } if (fullSize <= BottomLimitNecessary) { - return {TVectorView<TObject>(objects.begin(), objects.end())}; + return { TVectorView<TObject>(objects.begin(), objects.end()) }; } ui64 currentSize = 0; ui64 currentStart = 0; @@ -76,6 +76,9 @@ public: } return result; } + + static std::vector<ui32> SizesToRecordsCount( + const ui32 serializedRecordsCount, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt); }; -} +} // namespace NKikimr::NArrow::NSplitter diff --git a/ydb/library/formats/arrow/splitter/stats.cpp b/ydb/library/formats/arrow/splitter/stats.cpp index c815485ada..f981e0d3d6 100644 --- a/ydb/library/formats/arrow/splitter/stats.cpp +++ b/ydb/library/formats/arrow/splitter/stats.cpp @@ -21,4 +21,13 @@ std::optional<TBatchSerializationStat> TSerializationStats::GetStatsForRecordBat return GetStatsForRecordBatch(rb->schema()); } + TSimpleSerializationStat::TSimpleSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes) + : SerializedBytes(bytes) + , RecordsCount(recordsCount) + , RawBytes(rawBytes) { + Y_ABORT_UNLESS(SerializedBytes); + Y_ABORT_UNLESS(RecordsCount); +// Y_ABORT_UNLESS(RawBytes); +} + } diff --git a/ydb/library/formats/arrow/splitter/stats.h b/ydb/library/formats/arrow/splitter/stats.h index 447e59b68c..5c8c66b7f9 100644 --- a/ydb/library/formats/arrow/splitter/stats.h +++ b/ydb/library/formats/arrow/splitter/stats.h @@ -20,15 +20,7 @@ protected: ui64 RawBytes = 0; public: TSimpleSerializationStat() = default; - TSimpleSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes) - : SerializedBytes(bytes) - , RecordsCount(recordsCount) - , RawBytes(rawBytes) - { - Y_ABORT_UNLESS(SerializedBytes); - Y_ABORT_UNLESS(RecordsCount); - Y_ABORT_UNLESS(RawBytes); - } + TSimpleSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes); TString DebugString() const { return TStringBuilder() << "{" |