summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <[email protected]>2025-06-20 20:34:12 +0300
committermrlolthe1st <[email protected]>2025-06-20 20:48:54 +0300
commit949bebfd24cf8d844cf22d944596a8fe957e6417 (patch)
tree5febec6735b9ae3bfacb77dd1c53a863973b8d0d
parentcaec1b0ede2b901bcf006a403d15e38e1a94b659 (diff)
YQL-20059: Remove dechunking from fallback handler
commit_hash:20bcd0062beca6963b3c420835a33b9873f43ab2
-rw-r--r--yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp47
1 files changed, 6 insertions, 41 deletions
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
index 50e7a651507..fa5ac37976b 100644
--- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
+++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
@@ -230,48 +230,15 @@ public:
++RowsCnt_;
}
- std::vector<TResultBatch::TPtr> Build() {
+ TResultBatch::TPtr Build() {
std::vector<arrow::Datum> columns;
columns.reserve(ColumnBuilders_.size());
for (size_t i = 0; i < ColumnBuilders_.size(); ++i) {
columns.emplace_back(std::move(ColumnBuilders_[i]->Build(false)));
}
- std::vector<std::shared_ptr<TResultBatch>> blocks;
- int64_t offset = 0;
- std::vector<int64_t> currentChunk(columns.size()), inChunkOffset(columns.size());
- while (RowsCnt_) {
- int64_t max_curr_len = RowsCnt_;
- for (size_t i = 0; i < columns.size(); ++i) {
- if (arrow::Datum::Kind::CHUNKED_ARRAY == columns[i].kind()) {
- auto& c_arr = columns[i].chunked_array();
- while (currentChunk[i] < c_arr->num_chunks() && !c_arr->chunk(currentChunk[i])) {
- ++currentChunk[i];
- }
- YQL_ENSURE(currentChunk[i] < c_arr->num_chunks());
- max_curr_len = std::min(max_curr_len, c_arr->chunk(currentChunk[i])->length() - inChunkOffset[i]);
- }
- }
- RowsCnt_ -= max_curr_len;
- decltype(columns) result_columns;
- result_columns.reserve(columns.size());
- offset += max_curr_len;
- for (size_t i = 0; i < columns.size(); ++i) {
- auto& e = columns[i];
- if (arrow::Datum::Kind::CHUNKED_ARRAY == e.kind()) {
- result_columns.emplace_back(e.chunked_array()->chunk(currentChunk[i])->Slice(inChunkOffset[i], max_curr_len));
- if (max_curr_len + inChunkOffset[i] == e.chunked_array()->chunk(currentChunk[i])->length()) {
- ++currentChunk[i];
- inChunkOffset[i] = 0;
- } else {
- inChunkOffset[i] += max_curr_len;
- }
- } else {
- result_columns.emplace_back(e.array()->Slice(offset - max_curr_len, max_curr_len));
- }
- }
- blocks.emplace_back(std::make_shared<TResultBatch>(max_curr_len, std::move(result_columns)));
- }
- return blocks;
+ auto res = std::make_shared<TResultBatch>(RowsCnt_, std::move(columns));
+ RowsCnt_ = 0;
+ return res;
}
private:
@@ -480,9 +447,7 @@ public:
}
}
if (payload) {
- for (auto &e: FallbackHandler(inputIdx, payload)) {
- Listener_->HandleFallback(std::move(e));
- }
+ Listener_->HandleFallback(FallbackHandler(inputIdx, payload));
InputDone(inputIdx);
RunRead();
}
@@ -494,7 +459,7 @@ public:
}
}
- std::vector<TResultBatch::TPtr> FallbackHandler(size_t idx, NYT::TSharedRef payload) {
+ TResultBatch::TPtr FallbackHandler(size_t idx, NYT::TSharedRef payload) {
if (!payload.Size()) {
return {};
}