diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-04-09 09:49:41 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-09 09:49:41 +0300 |
commit | c8694035ff4df52fdee6834b69fa5fca38e7980c (patch) | |
tree | f260e0e8f3716aff8e01f270b145538afea4389a | |
parent | 0a86472d9f7a51a11e2fbcb871511d6e74132ec0 (diff) | |
download | ydb-c8694035ff4df52fdee6834b69fa5fca38e7980c.tar.gz |
use sequential read for data copy (#3555)
-rw-r--r-- | ydb/core/formats/arrow/common/accessor.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp | 2 |
3 files changed, 28 insertions, 8 deletions
diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index 56cf25096e7..acd7e3a650c 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -50,6 +50,14 @@ public: YDB_READONLY(ui64, Position, 0); YDB_READONLY(ui64, ChunkIdx, 0); public: + bool NextPosition() { + if (Position + 1 < (ui32)Array->length()) { + ++Position; + return true; + } + return false; + } + TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position, const ui64 chunkIdx) : Array(arr) , Position(position) diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 846a2f5eb75..cb932d2cd1e 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -294,21 +294,33 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, NArrow::NAccessor::IChunkedArray::TReader reader(trivialChunkedArray); std::optional<ui32> chunkIdx; + std::optional<ui32> currentIdxFrom; + std::optional<NArrow::NAccessor::IChunkedArray::TAddress> address; + const typename TElementAccessor::TArrayType* currentArray = nullptr; const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) { - auto address = reader.GetReadChunk(rowIndexFrom); - auto* currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address.GetArray().get()); - - if (!chunkIdx || *chunkIdx != address.GetChunkIdx()) { + if (!currentIdxFrom) { + address = reader.GetReadChunk(rowIndexFrom); + AFL_ENSURE(rowIndexFrom == 0)("real", rowIndexFrom); + } else { + AFL_ENSURE(rowIndexFrom == *currentIdxFrom + 1)("next", rowIndexFrom)("current", *currentIdxFrom); + if (!address->NextPosition()) { + address = reader.GetReadChunk(rowIndexFrom); + } + } + currentIdxFrom = rowIndexFrom; + + if (!chunkIdx || *chunkIdx != address->GetChunkIdx()) { + currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address->GetArray().get()); TElementAccessor::Validate(*currentArray); - chunkIdx = address.GetChunkIdx(); + chunkIdx = address->GetChunkIdx(); } auto& rowItem = editAccessor(rowIndexTo, columnIndex); - if (currentArray->IsNull(address.GetPosition())) { + if (currentArray->IsNull(address->GetPosition())) { statAccumulator.AddNull(); rowItem = NUdf::TUnboxedValue(); } else { - rowItem = TElementAccessor::ExtractValue(*currentArray, address.GetPosition()); + rowItem = TElementAccessor::ExtractValue(*currentArray, address->GetPosition()); statAccumulator.AddValue(rowItem); } }; diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp index 89eb5551249..6d487a26016 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp @@ -15,7 +15,7 @@ namespace NKikimr { const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor(); for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) { auto vDescription = descriptor->value(i); - CodesCount.emplace(vDescription->name(), TBase::GetDeriviative("Replies/" + vDescription->name() + "/Count")); + CodesCount.emplace(vDescription->name(), CreateSubGroup("reply_code", vDescription->name()).GetDeriviative("Replies/Count")); } } |