aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-04-09 09:49:41 +0300
committerGitHub <noreply@github.com>2024-04-09 09:49:41 +0300
commitc8694035ff4df52fdee6834b69fa5fca38e7980c (patch)
treef260e0e8f3716aff8e01f270b145538afea4389a
parent0a86472d9f7a51a11e2fbcb871511d6e74132ec0 (diff)
downloadydb-c8694035ff4df52fdee6834b69fa5fca38e7980c.tar.gz
use sequential read for data copy (#3555)
-rw-r--r--ydb/core/formats/arrow/common/accessor.h8
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp26
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp2
3 files changed, 28 insertions, 8 deletions
diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h
index 56cf25096e7..acd7e3a650c 100644
--- a/ydb/core/formats/arrow/common/accessor.h
+++ b/ydb/core/formats/arrow/common/accessor.h
@@ -50,6 +50,14 @@ public:
YDB_READONLY(ui64, Position, 0);
YDB_READONLY(ui64, ChunkIdx, 0);
public:
+ bool NextPosition() {
+ if (Position + 1 < (ui32)Array->length()) {
+ ++Position;
+ return true;
+ }
+ return false;
+ }
+
TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position, const ui64 chunkIdx)
: Array(arr)
, Position(position)
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 846a2f5eb75..cb932d2cd1e 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -294,21 +294,33 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
NArrow::NAccessor::IChunkedArray::TReader reader(trivialChunkedArray);
std::optional<ui32> chunkIdx;
+ std::optional<ui32> currentIdxFrom;
+ std::optional<NArrow::NAccessor::IChunkedArray::TAddress> address;
+ const typename TElementAccessor::TArrayType* currentArray = nullptr;
const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) {
- auto address = reader.GetReadChunk(rowIndexFrom);
- auto* currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address.GetArray().get());
-
- if (!chunkIdx || *chunkIdx != address.GetChunkIdx()) {
+ if (!currentIdxFrom) {
+ address = reader.GetReadChunk(rowIndexFrom);
+ AFL_ENSURE(rowIndexFrom == 0)("real", rowIndexFrom);
+ } else {
+ AFL_ENSURE(rowIndexFrom == *currentIdxFrom + 1)("next", rowIndexFrom)("current", *currentIdxFrom);
+ if (!address->NextPosition()) {
+ address = reader.GetReadChunk(rowIndexFrom);
+ }
+ }
+ currentIdxFrom = rowIndexFrom;
+
+ if (!chunkIdx || *chunkIdx != address->GetChunkIdx()) {
+ currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address->GetArray().get());
TElementAccessor::Validate(*currentArray);
- chunkIdx = address.GetChunkIdx();
+ chunkIdx = address->GetChunkIdx();
}
auto& rowItem = editAccessor(rowIndexTo, columnIndex);
- if (currentArray->IsNull(address.GetPosition())) {
+ if (currentArray->IsNull(address->GetPosition())) {
statAccumulator.AddNull();
rowItem = NUdf::TUnboxedValue();
} else {
- rowItem = TElementAccessor::ExtractValue(*currentArray, address.GetPosition());
+ rowItem = TElementAccessor::ExtractValue(*currentArray, address->GetPosition());
statAccumulator.AddValue(rowItem);
}
};
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
index 89eb5551249..6d487a26016 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
@@ -15,7 +15,7 @@ namespace NKikimr {
const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
auto vDescription = descriptor->value(i);
- CodesCount.emplace(vDescription->name(), TBase::GetDeriviative("Replies/" + vDescription->name() + "/Count"));
+ CodesCount.emplace(vDescription->name(), CreateSubGroup("reply_code", vDescription->name()).GetDeriviative("Replies/Count"));
}
}