aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-26 18:10:44 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-26 18:29:50 +0300
commit523f645a83a0ec97a0332dbc3863bb354c92a328 (patch)
treedd428c50f0479150bff2775db78b278352d2e761
parentc696905d58037337c9b473a04036ab605ec5e3fd (diff)
downloadydb-523f645a83a0ec97a0332dbc3863bb354c92a328.tar.gz
KIKIMR-20181: indexed access for repack intervals case
-rw-r--r--ydb/core/formats/arrow/permutations.cpp20
-rw-r--r--ydb/core/formats/arrow/permutations.h81
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h14
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp7
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_events.h35
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp68
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h58
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data_ut.cpp6
-rw-r--r--ydb/core/protos/kqp.proto1
12 files changed, 196 insertions, 101 deletions
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index 173193eec3..bac97268ac 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -109,7 +109,8 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
return out;
}
-std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes) {
+template <class TIndex>
+std::shared_ptr<arrow::UInt64Array> MakeFilterPermutationImpl(const std::vector<TIndex>& indexes) {
if (indexes.empty()) {
return {};
}
@@ -127,6 +128,14 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64
return out;
}
+std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui32>& indexes) {
+ return MakeFilterPermutationImpl(indexes);
+}
+
+std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes) {
+ return MakeFilterPermutationImpl(indexes);
+}
+
std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) {
Y_ABORT_UNLESS(!!source);
auto schema = source->schema();
@@ -305,12 +314,13 @@ ui64 TShardedRecordBatch::GetMemorySize() const {
return NArrow::GetBatchMemorySize(RecordBatch);
}
-TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch): RecordBatch(batch) {
+TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch)
+ : RecordBatch(batch)
+{
AFL_VERIFY(RecordBatch);
- SplittedByShards = {RecordBatch};
}
-TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards)
+TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::vector<ui32>>&& splittedByShards)
: RecordBatch(batch)
, SplittedByShards(std::move(splittedByShards))
{
@@ -356,7 +366,7 @@ NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shard
Y_ABORT_UNLESS(false);
}
auto resultBatch = NArrow::TStatusValidator::GetValid(input->RemoveColumn(input->schema()->GetFieldIndex(hashColumnName)));
- return TShardedRecordBatch(resultBatch, splitter->Apply(resultBatch));
+ return TShardedRecordBatch(resultBatch, splitter->DetachRemapping());
}
std::shared_ptr<arrow::UInt64Array> TShardingSplitIndex::BuildPermutation() const {
diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h
index 8acf5240fe..4e65e75932 100644
--- a/ydb/core/formats/arrow/permutations.h
+++ b/ydb/core/formats/arrow/permutations.h
@@ -16,15 +16,16 @@ public:
class TShardedRecordBatch {
private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, RecordBatch);
- YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, SplittedByShards);
+ YDB_READONLY_DEF(std::vector<std::vector<ui32>>, SplittedByShards);
public:
TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
void Cut(const ui32 limit) {
RecordBatch = RecordBatch->Slice(0, limit);
for (auto&& i : SplittedByShards) {
- if (i->num_rows() > limit) {
- i = i->Slice(0, limit);
+ auto it = std::lower_bound(i.begin(), i.end(), limit);
+ if (it != i.end()) {
+ i.erase(it, i.end());
}
}
}
@@ -33,15 +34,10 @@ public:
return SplittedByShards.size() > 1;
}
- TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards);
+ TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::vector<ui32>>&& splittedByShards);
void StripColumns(const std::shared_ptr<arrow::Schema>& schema) {
- if (RecordBatch) {
- RecordBatch = NArrow::ExtractColumns(RecordBatch, schema);
- }
- for (auto&& i : SplittedByShards) {
- i = NArrow::ExtractColumns(i, schema);
- }
+ RecordBatch = NArrow::ExtractColumns(RecordBatch, schema);
}
ui64 GetMemorySize() const;
@@ -54,19 +50,67 @@ public:
class TShardingSplitIndex {
private:
ui32 ShardsCount = 0;
- std::vector<std::vector<ui64>> Remapping;
+ std::vector<std::vector<ui32>> Remapping;
ui32 RecordsCount = 0;
+ template <class TIterator>
+ std::vector<ui32> MergeLists(const std::vector<ui32>& base, const TIterator itFrom, const TIterator itTo) {
+ std::vector<ui32> result;
+ result.reserve(base.size() + (itTo - itFrom));
+ auto itBase = base.begin();
+ auto itExt = itFrom;
+ while (itBase != base.end() && itExt != itTo) {
+ if (*itBase < *itExt) {
+ result.emplace_back(*itBase);
+ ++itBase;
+ } else {
+ result.emplace_back(*itExt);
+ ++itExt;
+ }
+ }
+ if (itBase == base.end()) {
+ result.insert(result.end(), itExt, itTo);
+ } else if (itExt == itTo) {
+ result.insert(result.end(), itBase, base.end());
+ }
+ return result;
+ }
+
template <class TIntArrowArray>
void Initialize(const TIntArrowArray& arrowHashArray) {
Y_ABORT_UNLESS(ShardsCount);
Remapping.resize(ShardsCount);
+ const ui32 expectation = arrowHashArray.length() / ShardsCount + 1;
+ for (auto&& i : Remapping) {
+ i.reserve(2 * expectation);
+ }
for (ui64 i = 0; i < (ui64)arrowHashArray.length(); ++i) {
- const auto v = arrowHashArray.GetView(i);
- if (v < 0) {
- Remapping[(-v) % ShardsCount].emplace_back(i);
- } else {
- Remapping[v % ShardsCount].emplace_back(i);
+ const i64 v = arrowHashArray.GetView(i);
+ const ui32 idx = ((v < 0) ? (-v) : v) % ShardsCount;
+ Remapping[idx].emplace_back(i);
+ }
+ std::deque<std::vector<ui32>*> sizeCorrection;
+ for (auto&& i : Remapping) {
+ sizeCorrection.emplace_back(&i);
+ }
+ const auto pred = [](const std::vector<ui32>* l, const std::vector<ui32>* r) {
+ return l->size() < r->size();
+ };
+ std::sort(sizeCorrection.begin(), sizeCorrection.end(), pred);
+ while (sizeCorrection.size() > 1 && sizeCorrection.back()->size() > expectation && sizeCorrection.front()->size() < expectation) {
+ const ui32 uselessRecords = sizeCorrection.back()->size() - expectation;
+ const ui32 needRecords = expectation - sizeCorrection.front()->size();
+ const ui32 moveRecords = std::min<ui32>(needRecords, uselessRecords);
+ if (moveRecords == 0) {
+ break;
+ }
+ *sizeCorrection.front() = MergeLists(*sizeCorrection.front(), sizeCorrection.back()->end() - moveRecords, sizeCorrection.back()->end());
+ sizeCorrection.back()->resize(sizeCorrection.back()->size() - moveRecords);
+ if (sizeCorrection.back()->size() <= expectation) {
+ sizeCorrection.pop_back();
+ }
+ if (sizeCorrection.front()->size() >= expectation) {
+ sizeCorrection.pop_front();
}
}
}
@@ -78,6 +122,10 @@ private:
public:
+ std::vector<std::vector<ui32>> DetachRemapping() {
+ return std::move(Remapping);
+ }
+
template <class TArrayClass>
static TShardingSplitIndex Build(const ui32 shardsCount, const arrow::Array& arrowHashArray) {
TShardingSplitIndex result(shardsCount, arrowHashArray);
@@ -94,6 +142,7 @@ public:
std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool reverse = false);
std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes);
+std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui32>& indexes);
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::RecordBatch> ReverseRecords(const std::shared_ptr<arrow::RecordBatch>& batch);
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index 2d504dd269..f2efd23f21 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -36,7 +36,7 @@ struct TEvKqpCompute {
ui32 Generation;
TVector<TOwnedCellVec> Rows;
std::shared_ptr<arrow::RecordBatch> ArrowBatch;
- std::vector<std::shared_ptr<arrow::RecordBatch>> SplittedBatches;
+ std::vector<std::vector<ui32>> SplittedBatches;
TOwnedCellVec LastKey;
TDuration CpuTime;
@@ -50,12 +50,6 @@ struct TEvKqpCompute {
ui32 GetRowsCount() const {
if (ArrowBatch) {
return ArrowBatch->num_rows();
- } else if (SplittedBatches.size()) {
- ui32 recordsCount = 0;
- for (auto&& i : SplittedBatches) {
- recordsCount += i->num_rows();
- }
- return recordsCount;
} else {
return Rows.size();
}
@@ -113,9 +107,6 @@ struct TEvKqpCompute {
auto schema = NArrow::DeserializeSchema(batch.GetSchema());
ev->ArrowBatch = NArrow::DeserializeBatch(batch.GetBatch(), schema);
}
- for (auto&& i : pbEv->Record.GetSplittedArrowBatches()) {
- ev->SplittedBatches.emplace_back(NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(i)));
- }
return ev.Release();
}
@@ -155,9 +146,6 @@ struct TEvKqpCompute {
break;
}
}
- for (auto&& i : SplittedBatches) {
- Remote->Record.AddSplittedArrowBatches(NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(i));
- }
}
}
};
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 748d1a30cd..1dfe1fbab6 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -221,7 +221,7 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
}
case NKikimrDataEvents::FORMAT_ARROW: {
if(msg.ArrowBatch != nullptr) {
- bytes = ScanData->AddData(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory());
+ bytes = ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.ArrowBatch), {}, TaskRunner->GetHolderFactory());
rowsCount = msg.ArrowBatch->num_rows();
}
break;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index dc046412fd..709d19bd55 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
@@ -129,7 +130,7 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) {
auto& msg = *ev->Get();
auto guard = TaskRunner->BindAllocator();
if (!!msg.GetArrowBatch()) {
- ScanData->AddData(*msg.GetArrowBatch(), msg.GetTabletId(), TaskRunner->GetHolderFactory());
+ ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes())), msg.GetTabletId(), TaskRunner->GetHolderFactory());
} else {
ScanData->AddData(std::move(msg.MutableRows()), msg.GetTabletId(), TaskRunner->GetHolderFactory());
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index a5ebda9aeb..4c356d2d58 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -29,11 +29,14 @@ std::vector<std::unique_ptr<TComputeTaskData>> TShardScannerInfo::OnReceiveData(
std::vector<std::unique_ptr<TComputeTaskData>> result;
if (data.SplittedBatches.size() > 1) {
ui32 idx = 0;
+ AFL_ENSURE(data.ArrowBatch);
for (auto&& i : data.SplittedBatches) {
- result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(TabletId, i), idx++));
+ result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId, std::move(i)), idx++));
}
+ } else if (data.ArrowBatch) {
+ result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId)));
} else {
- result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data, TabletId)));
+ result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(std::move(data.Rows), TabletId)));
}
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "receive_data")("count_chunks", result.size());
DataChunksInFlightCount = result.size();
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_events.h b/ydb/core/kqp/compute_actor/kqp_scan_events.h
index b4e638c6e7..d973b31fd3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_events.h
@@ -39,40 +39,29 @@ struct TEvScanExchange {
class TEvSendData: public NActors::TEventLocal<TEvSendData, EvSendData> {
private:
- YDB_ACCESSOR_DEF(std::shared_ptr<arrow::RecordBatch>, ArrowBatch);
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, ArrowBatch);
YDB_ACCESSOR_DEF(TVector<TOwnedCellVec>, Rows);
- YDB_ACCESSOR(ui64, TabletId, 0);
+ YDB_READONLY(ui64, TabletId, 0);
+ YDB_ACCESSOR_DEF(std::vector<ui32>, DataIndexes);
public:
ui32 GetRowsCount() const {
return ArrowBatch ? ArrowBatch->num_rows() : Rows.size();
}
- TEvSendData(const ui64 tabletId, const std::shared_ptr<arrow::RecordBatch>& batch)
- : ArrowBatch(batch)
+ TEvSendData(const std::shared_ptr<arrow::RecordBatch>& arrowBatch, const ui64 tabletId)
+ : ArrowBatch(arrowBatch)
, TabletId(tabletId)
{
+ Y_ABORT_UNLESS(ArrowBatch);
+ Y_ABORT_UNLESS(ArrowBatch->num_rows());
}
- TEvSendData(TEvKqpCompute::TEvScanData& msg, const ui64 tabletId)
- : TabletId(tabletId) {
- switch (msg.GetDataFormat()) {
- case NKikimrDataEvents::FORMAT_CELLVEC:
- case NKikimrDataEvents::FORMAT_UNSPECIFIED:
- Rows = std::move(msg.Rows);
- Y_ABORT_UNLESS(Rows.size());
- break;
- case NKikimrDataEvents::FORMAT_ARROW:
- ArrowBatch = msg.ArrowBatch;
- Y_ABORT_UNLESS(ArrowBatch);
- Y_ABORT_UNLESS(ArrowBatch->num_rows());
- break;
- }
-
- }
-
- TEvSendData(std::shared_ptr<arrow::RecordBatch> arrowBatch, const ui64 tabletId)
+ TEvSendData(const std::shared_ptr<arrow::RecordBatch>& arrowBatch, const ui64 tabletId, std::vector<ui32>&& dataIndexes)
: ArrowBatch(arrowBatch)
- , TabletId(tabletId) {
+ , TabletId(tabletId)
+ , DataIndexes(std::move(dataIndexes))
+ {
+ Y_ABORT_UNLESS(ArrowBatch);
Y_ABORT_UNLESS(ArrowBatch->num_rows());
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 73c3e48b55..8ac2f7bd9d 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -1123,7 +1123,7 @@ public:
} else {
hasResultColumns = true;
stats.AddStatistics(
- NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, column.TypeInfo)
+ NMiniKQL::WriteColumnValuesFromArrow(editAccessors, NMiniKQL::TBatchDataAccessor(result->Get()->GetArrowBatch()), columnIndex, resultColumnIndex, column.TypeInfo)
);
if (column.NotNull) {
std::shared_ptr<arrow::Array> columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex);
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 67cbb800c7..6662889cfc 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -281,19 +281,31 @@ public:
template <class TElementAccessor, class TAccessor>
TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
- const arrow::RecordBatch& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) {
+ const TBatchDataAccessor& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) {
auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(arrayExt);
TElementAccessor::Validate(array);
auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType);
- for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- auto& rowItem = editAccessor(rowIndex, columnIndex);
- if (array.IsNull(rowIndex)) {
+
+ const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) {
+ auto& rowItem = editAccessor(rowIndexTo, columnIndex);
+ if (array.IsNull(rowIndexFrom)) {
statAccumulator.AddNull();
rowItem = NUdf::TUnboxedValue();
} else {
- rowItem = TElementAccessor::ExtractValue(array, rowIndex);
+ rowItem = TElementAccessor::ExtractValue(array, rowIndexFrom);
statAccumulator.AddValue(rowItem);
}
+ };
+
+ if (batch.HasDataIndexes()) {
+ ui32 idx = 0;
+ for (const i64 rowIndex: batch.GetDataIndexes()) {
+ applyToIndex(rowIndex, idx++);
+ }
+ } else {
+ for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) {
+ applyToIndex(rowIndex, rowIndex);
+ }
}
return statAccumulator.Finish();
}
@@ -301,8 +313,8 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
template <class TAccessor>
TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
- const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) {
- std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex);
+ const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType) {
+ std::shared_ptr<arrow::Array> columnSharedPtr = batch.GetBatch()->column(columnIndex);
arrow::Array* columnPtr = columnSharedPtr.get();
namespace NTypeIds = NScheme::NTypeIds;
switch (columnType.GetTypeId()) {
@@ -396,7 +408,7 @@ TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
}
TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType)
+ const TBatchDataAccessor& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType)
{
const auto accessor = [editAccessors, columnsCount](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
return editAccessors[rowIndex * columnsCount + colIndex];
@@ -405,7 +417,7 @@ TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
}
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType)
+ const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType)
{
const auto accessor = [&editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
return editAccessors[rowIndex][colIndex];
@@ -414,7 +426,7 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
}
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType)
+ const TBatchDataAccessor& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType)
{
const auto accessor = [=, &editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
YQL_ENSURE(colIndex == columnIndex);
@@ -538,17 +550,17 @@ ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& ba
return stats.AllocatedBytes;
}
-TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId,
+TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId,
const THolderFactory& holderFactory)
{
TBytesStatistics stats;
TUnboxedValueVector cells;
if (TotalColumnsCount == 0u) {
- cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer());
- stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() });
+ cells.resize(batch.GetRecordsCount(), holderFactory.GetEmptyContainer());
+ stats.AddStatistics({ sizeof(ui64) * batch.GetRecordsCount(), sizeof(ui64) * batch.GetRecordsCount() });
} else {
- cells.resize(batch.num_rows() * TotalColumnsCount);
+ cells.resize(batch.GetRecordsCount() * TotalColumnsCount);
for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) {
stats.AddStatistics(
@@ -557,16 +569,16 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(con
}
if (!SystemColumns.empty()) {
- for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
+ for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) {
FillSystemColumns(&cells[rowIndex * TotalColumnsCount + ResultColumns.size()], shardId, SystemColumns);
}
- stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
+ stats.AllocatedBytes += batch.GetRecordsCount() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
}
}
if (cells.size()) {
- RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.num_rows(), std::move(cells), stats.AllocatedBytes));
+ RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.GetRecordsCount(), std::move(cells), stats.AllocatedBytes));
}
StoredBytes += stats.AllocatedBytes;
@@ -574,18 +586,18 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(con
return stats;
}
-TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> /*shardId*/,
+TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> /*shardId*/,
const THolderFactory& holderFactory)
{
TBytesStatistics stats;
auto totalColsCount = TotalColumnsCount + 1;
TUnboxedValueVector batchValues;
batchValues.resize(totalColsCount);
-
- for (int i = 0; i < batch.num_columns(); ++i) {
- batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i)));
+ std::shared_ptr<arrow::RecordBatch> filtered = batch.GetFiltered();
+ for (int i = 0; i < filtered->num_columns(); ++i) {
+ batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(filtered->column_data(i)));
}
- ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(batch);
+ const ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(*filtered);
stats.AddStatistics({batchByteSize, batchByteSize});
// !!! TODO !!!
@@ -597,26 +609,26 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(c
// stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
// }
- batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows())));
- stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() });
+ batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.GetRecordsCount())));
+ stats.AddStatistics({ sizeof(ui64) * batch.GetRecordsCount(), sizeof(ui64) * batch.GetRecordsCount()});
- BlockBatches.emplace(TBlockBatch(totalColsCount, batch.num_rows(), std::move(batchValues), stats.AllocatedBytes));
+ BlockBatches.emplace(TBlockBatch(totalColsCount, batch.GetRecordsCount(), std::move(batchValues), stats.AllocatedBytes));
StoredBytes += stats.AllocatedBytes;
return stats;
}
-ui64 TKqpScanComputeContext::TScanData::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId,
+ui64 TKqpScanComputeContext::TScanData::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId,
const THolderFactory& holderFactory)
{
// RecordBatch hasn't empty method so check the number of rows
- if (Finished || batch.num_rows() == 0) {
+ if (Finished || batch.GetRecordsCount() == 0) {
return 0;
}
TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory);
if (BasicStats) {
- BasicStats->Rows += batch.num_rows();
+ BasicStats->Rows += batch.GetRecordsCount();
BasicStats->Bytes += stats.DataBytes;
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index d166fce4ab..224144301d 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -7,6 +7,7 @@
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tablet_flat/flat_database.h>
@@ -17,6 +18,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
namespace NKikimrTxDataShard {
class TKqpTransaction_TScanTaskMeta;
@@ -57,13 +59,55 @@ struct TBytesStatistics {
};
+class TBatchDataAccessor {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
+ YDB_READONLY_DEF(std::vector<ui32>, DataIndexes);
+ mutable std::shared_ptr<arrow::RecordBatch> FilteredBatch;
+public:
+ std::shared_ptr<arrow::RecordBatch> GetFiltered() const {
+ if (!FilteredBatch) {
+ if (DataIndexes.size()) {
+ auto permutation = NArrow::MakeFilterPermutation(DataIndexes);
+ FilteredBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(Batch, permutation)).record_batch();
+ } else {
+ FilteredBatch = Batch;
+ }
+ }
+ return FilteredBatch;
+ }
+
+ bool HasDataIndexes() const {
+ return DataIndexes.size();
+ }
+
+ ui32 GetRecordsCount() const {
+ return DataIndexes.size() ? DataIndexes.size() : Batch->num_rows();
+ }
+
+ TBatchDataAccessor(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<ui32>&& dataIndexes)
+ : Batch(batch)
+ , DataIndexes(std::move(dataIndexes))
+ {
+ AFL_VERIFY(Batch);
+ AFL_VERIFY(Batch->num_rows());
+ }
+
+ TBatchDataAccessor(const std::shared_ptr<arrow::RecordBatch>& batch)
+ : Batch(batch) {
+ AFL_VERIFY(Batch);
+ AFL_VERIFY(Batch->num_rows());
+
+ }
+};
+
TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type);
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
+ const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType);
+ const TBatchDataAccessor& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType);
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType);
+ const TBatchDataAccessor& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType);
void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type);
@@ -98,7 +142,7 @@ public:
}
ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
- ui64 AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
+ ui64 AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
bool IsEmpty() const {
return BatchReader->IsEmpty();
@@ -165,7 +209,7 @@ public:
}
virtual TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0;
- virtual TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0;
+ virtual TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0;
virtual ui32 FillDataValues(NUdf::TUnboxedValue* const* result) = 0;
virtual void Clear() = 0;
virtual bool IsEmpty() const = 0;
@@ -191,7 +235,7 @@ public:
}
TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
- TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override;
void Clear() override {
@@ -256,7 +300,7 @@ public:
}
TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
- TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ TBytesStatistics AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override;
void Clear() override {
diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
index bd7ac9eef2..508d10cb6e 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
@@ -249,7 +249,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, rows.front().Columns());
- scanData.AddData(*batch, {}, factory);
+ scanData.AddData(batch, {}, factory);
std::vector<NUdf::TUnboxedValue> container;
container.resize(20);
@@ -305,7 +305,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
resultCols.push_back(resCol);
TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, resultCols);
- scanData.AddData(*batch, {}, factory);
+ scanData.AddData(batch, {}, factory);
std::vector<NUdf::TUnboxedValue> container;
container.resize(1);
@@ -353,7 +353,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
TVector<TDataRow> rows = TestRows();
std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows, rows.front().MakeArrowSchema());
- auto bytes = scanData.AddData(*anotherEmptyBatch, {}, factory);
+ auto bytes = scanData.AddData(anotherEmptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
std::vector<NUdf::TUnboxedValue*> containerPtr;
for (const auto& row: rows) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index e04ecff03f..5a5abc93ff 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -610,7 +610,6 @@ message TEvRemoteScanData {
optional bool RequestedBytesLimitReached = 11 [default = false];
optional uint32 AvailablePacks = 12;
- repeated bytes SplittedArrowBatches = 13;
}
message TEvRemoteScanDataAck {