diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-26 18:10:44 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-26 18:29:50 +0300 |
commit | 523f645a83a0ec97a0332dbc3863bb354c92a328 (patch) | |
tree | dd428c50f0479150bff2775db78b278352d2e761 | |
parent | c696905d58037337c9b473a04036ab605ec5e3fd (diff) | |
download | ydb-523f645a83a0ec97a0332dbc3863bb354c92a328.tar.gz |
KIKIMR-20181: indexed access for repack intervals case
-rw-r--r-- | ydb/core/formats/arrow/permutations.cpp | 20 | ||||
-rw-r--r-- | ydb/core/formats/arrow/permutations.h | 81 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_events.h | 14 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_events.h | 35 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 68 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.h | 58 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 |
12 files changed, 196 insertions, 101 deletions
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index 173193eec3..bac97268ac 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -109,7 +109,8 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar return out; } -std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes) { +template <class TIndex> +std::shared_ptr<arrow::UInt64Array> MakeFilterPermutationImpl(const std::vector<TIndex>& indexes) { if (indexes.empty()) { return {}; } @@ -127,6 +128,14 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64 return out; } +std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui32>& indexes) { + return MakeFilterPermutationImpl(indexes); +} + +std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes) { + return MakeFilterPermutationImpl(indexes); +} + std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) { Y_ABORT_UNLESS(!!source); auto schema = source->schema(); @@ -305,12 +314,13 @@ ui64 TShardedRecordBatch::GetMemorySize() const { return NArrow::GetBatchMemorySize(RecordBatch); } -TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch): RecordBatch(batch) { +TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch) + : RecordBatch(batch) +{ AFL_VERIFY(RecordBatch); - SplittedByShards = {RecordBatch}; } -TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards) +TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::vector<ui32>>&& splittedByShards) : RecordBatch(batch) , SplittedByShards(std::move(splittedByShards)) { @@ -356,7 +366,7 @@ NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shard Y_ABORT_UNLESS(false); } auto resultBatch = NArrow::TStatusValidator::GetValid(input->RemoveColumn(input->schema()->GetFieldIndex(hashColumnName))); - return TShardedRecordBatch(resultBatch, splitter->Apply(resultBatch)); + return TShardedRecordBatch(resultBatch, splitter->DetachRemapping()); } std::shared_ptr<arrow::UInt64Array> TShardingSplitIndex::BuildPermutation() const { diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h index 8acf5240fe..4e65e75932 100644 --- a/ydb/core/formats/arrow/permutations.h +++ b/ydb/core/formats/arrow/permutations.h @@ -16,15 +16,16 @@ public: class TShardedRecordBatch { private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, RecordBatch); - YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, SplittedByShards); + YDB_READONLY_DEF(std::vector<std::vector<ui32>>, SplittedByShards); public: TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch); void Cut(const ui32 limit) { RecordBatch = RecordBatch->Slice(0, limit); for (auto&& i : SplittedByShards) { - if (i->num_rows() > limit) { - i = i->Slice(0, limit); + auto it = std::lower_bound(i.begin(), i.end(), limit); + if (it != i.end()) { + i.erase(it, i.end()); } } } @@ -33,15 +34,10 @@ public: return SplittedByShards.size() > 1; } - TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards); + TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::vector<ui32>>&& splittedByShards); void StripColumns(const std::shared_ptr<arrow::Schema>& schema) { - if (RecordBatch) { - RecordBatch = NArrow::ExtractColumns(RecordBatch, schema); - } - for (auto&& i : SplittedByShards) { - i = NArrow::ExtractColumns(i, schema); - } + RecordBatch = NArrow::ExtractColumns(RecordBatch, schema); } ui64 GetMemorySize() const; @@ -54,19 +50,67 @@ public: class TShardingSplitIndex { private: ui32 ShardsCount = 0; - std::vector<std::vector<ui64>> Remapping; + std::vector<std::vector<ui32>> Remapping; ui32 RecordsCount = 0; + template <class TIterator> + std::vector<ui32> MergeLists(const std::vector<ui32>& base, const TIterator itFrom, const TIterator itTo) { + std::vector<ui32> result; + result.reserve(base.size() + (itTo - itFrom)); + auto itBase = base.begin(); + auto itExt = itFrom; + while (itBase != base.end() && itExt != itTo) { + if (*itBase < *itExt) { + result.emplace_back(*itBase); + ++itBase; + } else { + result.emplace_back(*itExt); + ++itExt; + } + } + if (itBase == base.end()) { + result.insert(result.end(), itExt, itTo); + } else if (itExt == itTo) { + result.insert(result.end(), itBase, base.end()); + } + return result; + } + template <class TIntArrowArray> void Initialize(const TIntArrowArray& arrowHashArray) { Y_ABORT_UNLESS(ShardsCount); Remapping.resize(ShardsCount); + const ui32 expectation = arrowHashArray.length() / ShardsCount + 1; + for (auto&& i : Remapping) { + i.reserve(2 * expectation); + } for (ui64 i = 0; i < (ui64)arrowHashArray.length(); ++i) { - const auto v = arrowHashArray.GetView(i); - if (v < 0) { - Remapping[(-v) % ShardsCount].emplace_back(i); - } else { - Remapping[v % ShardsCount].emplace_back(i); + const i64 v = arrowHashArray.GetView(i); + const ui32 idx = ((v < 0) ? (-v) : v) % ShardsCount; + Remapping[idx].emplace_back(i); + } + std::deque<std::vector<ui32>*> sizeCorrection; + for (auto&& i : Remapping) { + sizeCorrection.emplace_back(&i); + } + const auto pred = [](const std::vector<ui32>* l, const std::vector<ui32>* r) { + return l->size() < r->size(); + }; + std::sort(sizeCorrection.begin(), sizeCorrection.end(), pred); + while (sizeCorrection.size() > 1 && sizeCorrection.back()->size() > expectation && sizeCorrection.front()->size() < expectation) { + const ui32 uselessRecords = sizeCorrection.back()->size() - expectation; + const ui32 needRecords = expectation - sizeCorrection.front()->size(); + const ui32 moveRecords = std::min<ui32>(needRecords, uselessRecords); + if (moveRecords == 0) { + break; + } + *sizeCorrection.front() = MergeLists(*sizeCorrection.front(), sizeCorrection.back()->end() - moveRecords, sizeCorrection.back()->end()); + sizeCorrection.back()->resize(sizeCorrection.back()->size() - moveRecords); + if (sizeCorrection.back()->size() <= expectation) { + sizeCorrection.pop_back(); + } + if (sizeCorrection.front()->size() >= expectation) { + sizeCorrection.pop_front(); } } } @@ -78,6 +122,10 @@ private: public: + std::vector<std::vector<ui32>> DetachRemapping() { + return std::move(Remapping); + } + template <class TArrayClass> static TShardingSplitIndex Build(const ui32 shardsCount, const arrow::Array& arrowHashArray) { TShardingSplitIndex result(shardsCount, arrowHashArray); @@ -94,6 +142,7 @@ public: std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool reverse = false); std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes); +std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui32>& indexes); std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique); std::shared_ptr<arrow::RecordBatch> ReverseRecords(const std::shared_ptr<arrow::RecordBatch>& batch); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index 2d504dd269..f2efd23f21 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -36,7 +36,7 @@ struct TEvKqpCompute { ui32 Generation; TVector<TOwnedCellVec> Rows; std::shared_ptr<arrow::RecordBatch> ArrowBatch; - std::vector<std::shared_ptr<arrow::RecordBatch>> SplittedBatches; + std::vector<std::vector<ui32>> SplittedBatches; TOwnedCellVec LastKey; TDuration CpuTime; @@ -50,12 +50,6 @@ struct TEvKqpCompute { ui32 GetRowsCount() const { if (ArrowBatch) { return ArrowBatch->num_rows(); - } else if (SplittedBatches.size()) { - ui32 recordsCount = 0; - for (auto&& i : SplittedBatches) { - recordsCount += i->num_rows(); - } - return recordsCount; } else { return Rows.size(); } @@ -113,9 +107,6 @@ struct TEvKqpCompute { auto schema = NArrow::DeserializeSchema(batch.GetSchema()); ev->ArrowBatch = NArrow::DeserializeBatch(batch.GetBatch(), schema); } - for (auto&& i : pbEv->Record.GetSplittedArrowBatches()) { - ev->SplittedBatches.emplace_back(NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(i))); - } return ev.Release(); } @@ -155,9 +146,6 @@ struct TEvKqpCompute { break; } } - for (auto&& i : SplittedBatches) { - Remote->Record.AddSplittedArrowBatches(NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(i)); - } } } }; diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 748d1a30cd..1dfe1fbab6 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -221,7 +221,7 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { } case NKikimrDataEvents::FORMAT_ARROW: { if(msg.ArrowBatch != nullptr) { - bytes = ScanData->AddData(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory()); + bytes = ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.ArrowBatch), {}, TaskRunner->GetHolderFactory()); rowsCount = msg.ArrowBatch->num_rows(); } break; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index dc046412fd..709d19bd55 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -6,6 +6,7 @@ #include <ydb/core/base/feature_flags.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/runtime/kqp_tasks_runner.h> +#include <ydb/core/kqp/runtime/kqp_scan_data.h> #include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> @@ -129,7 +130,7 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) { auto& msg = *ev->Get(); auto guard = TaskRunner->BindAllocator(); if (!!msg.GetArrowBatch()) { - ScanData->AddData(*msg.GetArrowBatch(), msg.GetTabletId(), TaskRunner->GetHolderFactory()); + ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes())), msg.GetTabletId(), TaskRunner->GetHolderFactory()); } else { ScanData->AddData(std::move(msg.MutableRows()), msg.GetTabletId(), TaskRunner->GetHolderFactory()); } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index a5ebda9aeb..4c356d2d58 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -29,11 +29,14 @@ std::vector<std::unique_ptr<TComputeTaskData>> TShardScannerInfo::OnReceiveData( std::vector<std::unique_ptr<TComputeTaskData>> result; if (data.SplittedBatches.size() > 1) { ui32 idx = 0; + AFL_ENSURE(data.ArrowBatch); for (auto&& i : data.SplittedBatches) { - result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(TabletId, i), idx++)); + result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId, std::move(i)), idx++)); } + } else if (data.ArrowBatch) { + result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId))); } else { - result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data, TabletId))); + result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(std::move(data.Rows), TabletId))); } AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "receive_data")("count_chunks", result.size()); DataChunksInFlightCount = result.size(); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_events.h b/ydb/core/kqp/compute_actor/kqp_scan_events.h index b4e638c6e7..d973b31fd3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_events.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_events.h @@ -39,40 +39,29 @@ struct TEvScanExchange { class TEvSendData: public NActors::TEventLocal<TEvSendData, EvSendData> { private: - YDB_ACCESSOR_DEF(std::shared_ptr<arrow::RecordBatch>, ArrowBatch); + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, ArrowBatch); YDB_ACCESSOR_DEF(TVector<TOwnedCellVec>, Rows); - YDB_ACCESSOR(ui64, TabletId, 0); + YDB_READONLY(ui64, TabletId, 0); + YDB_ACCESSOR_DEF(std::vector<ui32>, DataIndexes); public: ui32 GetRowsCount() const { return ArrowBatch ? ArrowBatch->num_rows() : Rows.size(); } - TEvSendData(const ui64 tabletId, const std::shared_ptr<arrow::RecordBatch>& batch) - : ArrowBatch(batch) + TEvSendData(const std::shared_ptr<arrow::RecordBatch>& arrowBatch, const ui64 tabletId) + : ArrowBatch(arrowBatch) , TabletId(tabletId) { + Y_ABORT_UNLESS(ArrowBatch); + Y_ABORT_UNLESS(ArrowBatch->num_rows()); } - TEvSendData(TEvKqpCompute::TEvScanData& msg, const ui64 tabletId) - : TabletId(tabletId) { - switch (msg.GetDataFormat()) { - case NKikimrDataEvents::FORMAT_CELLVEC: - case NKikimrDataEvents::FORMAT_UNSPECIFIED: - Rows = std::move(msg.Rows); - Y_ABORT_UNLESS(Rows.size()); - break; - case NKikimrDataEvents::FORMAT_ARROW: - ArrowBatch = msg.ArrowBatch; - Y_ABORT_UNLESS(ArrowBatch); - Y_ABORT_UNLESS(ArrowBatch->num_rows()); - break; - } - - } - - TEvSendData(std::shared_ptr<arrow::RecordBatch> arrowBatch, const ui64 tabletId) + TEvSendData(const std::shared_ptr<arrow::RecordBatch>& arrowBatch, const ui64 tabletId, std::vector<ui32>&& dataIndexes) : ArrowBatch(arrowBatch) - , TabletId(tabletId) { + , TabletId(tabletId) + , DataIndexes(std::move(dataIndexes)) + { + Y_ABORT_UNLESS(ArrowBatch); Y_ABORT_UNLESS(ArrowBatch->num_rows()); } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 73c3e48b55..8ac2f7bd9d 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1123,7 +1123,7 @@ public: } else { hasResultColumns = true; stats.AddStatistics( - NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, column.TypeInfo) + NMiniKQL::WriteColumnValuesFromArrow(editAccessors, NMiniKQL::TBatchDataAccessor(result->Get()->GetArrowBatch()), columnIndex, resultColumnIndex, column.TypeInfo) ); if (column.NotNull) { std::shared_ptr<arrow::Array> columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex); diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 67cbb800c7..6662889cfc 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -281,19 +281,31 @@ public: template <class TElementAccessor, class TAccessor> TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, - const arrow::RecordBatch& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) { + const TBatchDataAccessor& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) { auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(arrayExt); TElementAccessor::Validate(array); auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType); - for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - auto& rowItem = editAccessor(rowIndex, columnIndex); - if (array.IsNull(rowIndex)) { + + const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) { + auto& rowItem = editAccessor(rowIndexTo, columnIndex); + if (array.IsNull(rowIndexFrom)) { statAccumulator.AddNull(); rowItem = NUdf::TUnboxedValue(); } else { - rowItem = TElementAccessor::ExtractValue(array, rowIndex); + rowItem = TElementAccessor::ExtractValue(array, rowIndexFrom); statAccumulator.AddValue(rowItem); } + }; + + if (batch.HasDataIndexes()) { + ui32 idx = 0; + for (const i64 rowIndex: batch.GetDataIndexes()) { + applyToIndex(rowIndex, idx++); + } + } else { + for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) { + applyToIndex(rowIndex, rowIndex); + } } return statAccumulator.Finish(); } @@ -301,8 +313,8 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, template <class TAccessor> TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor, - const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { - std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex); + const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { + std::shared_ptr<arrow::Array> columnSharedPtr = batch.GetBatch()->column(columnIndex); arrow::Array* columnPtr = columnSharedPtr.get(); namespace NTypeIds = NScheme::NTypeIds; switch (columnType.GetTypeId()) { @@ -396,7 +408,7 @@ TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor, } TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType) + const TBatchDataAccessor& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType) { const auto accessor = [editAccessors, columnsCount](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { return editAccessors[rowIndex * columnsCount + colIndex]; @@ -405,7 +417,7 @@ TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, } TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) + const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { const auto accessor = [&editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { return editAccessors[rowIndex][colIndex]; @@ -414,7 +426,7 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& } TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType) + const TBatchDataAccessor& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType) { const auto accessor = [=, &editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { YQL_ENSURE(colIndex == columnIndex); @@ -538,17 +550,17 @@ ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& ba return stats.AllocatedBytes; } -TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, +TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) { TBytesStatistics stats; TUnboxedValueVector cells; if (TotalColumnsCount == 0u) { - cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer()); - stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() }); + cells.resize(batch.GetRecordsCount(), holderFactory.GetEmptyContainer()); + stats.AddStatistics({ sizeof(ui64) * batch.GetRecordsCount(), sizeof(ui64) * batch.GetRecordsCount() }); } else { - cells.resize(batch.num_rows() * TotalColumnsCount); + cells.resize(batch.GetRecordsCount() * TotalColumnsCount); for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) { stats.AddStatistics( @@ -557,16 +569,16 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(con } if (!SystemColumns.empty()) { - for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { + for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) { FillSystemColumns(&cells[rowIndex * TotalColumnsCount + ResultColumns.size()], shardId, SystemColumns); } - stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); + stats.AllocatedBytes += batch.GetRecordsCount() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); } } if (cells.size()) { - RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.num_rows(), std::move(cells), stats.AllocatedBytes)); + RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.GetRecordsCount(), std::move(cells), stats.AllocatedBytes)); } StoredBytes += stats.AllocatedBytes; @@ -574,18 +586,18 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(con return stats; } -TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> /*shardId*/, +TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> /*shardId*/, const THolderFactory& holderFactory) { TBytesStatistics stats; auto totalColsCount = TotalColumnsCount + 1; TUnboxedValueVector batchValues; batchValues.resize(totalColsCount); - - for (int i = 0; i < batch.num_columns(); ++i) { - batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i))); + std::shared_ptr<arrow::RecordBatch> filtered = batch.GetFiltered(); + for (int i = 0; i < filtered->num_columns(); ++i) { + batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(filtered->column_data(i))); } - ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(batch); + const ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(*filtered); stats.AddStatistics({batchByteSize, batchByteSize}); // !!! TODO !!! @@ -597,26 +609,26 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(c // stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); // } - batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows()))); - stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() }); + batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.GetRecordsCount()))); + stats.AddStatistics({ sizeof(ui64) * batch.GetRecordsCount(), sizeof(ui64) * batch.GetRecordsCount()}); - BlockBatches.emplace(TBlockBatch(totalColsCount, batch.num_rows(), std::move(batchValues), stats.AllocatedBytes)); + BlockBatches.emplace(TBlockBatch(totalColsCount, batch.GetRecordsCount(), std::move(batchValues), stats.AllocatedBytes)); StoredBytes += stats.AllocatedBytes; return stats; } -ui64 TKqpScanComputeContext::TScanData::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, +ui64 TKqpScanComputeContext::TScanData::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) { // RecordBatch hasn't empty method so check the number of rows - if (Finished || batch.num_rows() == 0) { + if (Finished || batch.GetRecordsCount() == 0) { return 0; } TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory); if (BasicStats) { - BasicStats->Rows += batch.num_rows(); + BasicStats->Rows += batch.GetRecordsCount(); BasicStats->Bytes += stats.DataBytes; } diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index d166fce4ab..224144301d 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -7,6 +7,7 @@ #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/permutations.h> #include <ydb/core/scheme/scheme_tabledefs.h> #include <ydb/core/tablet_flat/flat_database.h> @@ -17,6 +18,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h> namespace NKikimrTxDataShard { class TKqpTransaction_TScanTaskMeta; @@ -57,13 +59,55 @@ struct TBytesStatistics { }; +class TBatchDataAccessor { +private: + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); + YDB_READONLY_DEF(std::vector<ui32>, DataIndexes); + mutable std::shared_ptr<arrow::RecordBatch> FilteredBatch; +public: + std::shared_ptr<arrow::RecordBatch> GetFiltered() const { + if (!FilteredBatch) { + if (DataIndexes.size()) { + auto permutation = NArrow::MakeFilterPermutation(DataIndexes); + FilteredBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(Batch, permutation)).record_batch(); + } else { + FilteredBatch = Batch; + } + } + return FilteredBatch; + } + + bool HasDataIndexes() const { + return DataIndexes.size(); + } + + ui32 GetRecordsCount() const { + return DataIndexes.size() ? DataIndexes.size() : Batch->num_rows(); + } + + TBatchDataAccessor(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<ui32>&& dataIndexes) + : Batch(batch) + , DataIndexes(std::move(dataIndexes)) + { + AFL_VERIFY(Batch); + AFL_VERIFY(Batch->num_rows()); + } + + TBatchDataAccessor(const std::shared_ptr<arrow::RecordBatch>& batch) + : Batch(batch) { + AFL_VERIFY(Batch); + AFL_VERIFY(Batch->num_rows()); + + } +}; + TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type); TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType); + const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType); TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType); + const TBatchDataAccessor& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType); TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType); + const TBatchDataAccessor& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType); void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type); @@ -98,7 +142,7 @@ public: } ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); - ui64 AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); + ui64 AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); bool IsEmpty() const { return BatchReader->IsEmpty(); @@ -165,7 +209,7 @@ public: } virtual TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0; - virtual TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0; + virtual TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0; virtual ui32 FillDataValues(NUdf::TUnboxedValue* const* result) = 0; virtual void Clear() = 0; virtual bool IsEmpty() const = 0; @@ -191,7 +235,7 @@ public: } TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; - TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override; void Clear() override { @@ -256,7 +300,7 @@ public: } TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; - TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override; void Clear() override { diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp index bd7ac9eef2..508d10cb6e 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp @@ -249,7 +249,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, rows.front().Columns()); - scanData.AddData(*batch, {}, factory); + scanData.AddData(batch, {}, factory); std::vector<NUdf::TUnboxedValue> container; container.resize(20); @@ -305,7 +305,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { resultCols.push_back(resCol); TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, resultCols); - scanData.AddData(*batch, {}, factory); + scanData.AddData(batch, {}, factory); std::vector<NUdf::TUnboxedValue> container; container.resize(1); @@ -353,7 +353,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { TVector<TDataRow> rows = TestRows(); std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows, rows.front().MakeArrowSchema()); - auto bytes = scanData.AddData(*anotherEmptyBatch, {}, factory); + auto bytes = scanData.AddData(anotherEmptyBatch, {}, factory); UNIT_ASSERT(bytes > 0); std::vector<NUdf::TUnboxedValue*> containerPtr; for (const auto& row: rows) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index e04ecff03f..5a5abc93ff 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -610,7 +610,6 @@ message TEvRemoteScanData { optional bool RequestedBytesLimitReached = 11 [default = false]; optional uint32 AvailablePacks = 12; - repeated bytes SplittedArrowBatches = 13; } message TEvRemoteScanDataAck { |