diff options
| author | mrlolthe1st <[email protected]> | 2025-06-20 20:34:12 +0300 |
|---|---|---|
| committer | mrlolthe1st <[email protected]> | 2025-06-20 20:48:54 +0300 |
| commit | 949bebfd24cf8d844cf22d944596a8fe957e6417 (patch) | |
| tree | 5febec6735b9ae3bfacb77dd1c53a863973b8d0d | |
| parent | caec1b0ede2b901bcf006a403d15e38e1a94b659 (diff) | |
YQL-20059: Remove dechunking from fallback handler
commit_hash:20bcd0062beca6963b3c420835a33b9873f43ab2
| -rw-r--r-- | yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp | 47 |
1 files changed, 6 insertions, 41 deletions
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index 50e7a651507..fa5ac37976b 100644 --- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -230,48 +230,15 @@ public: ++RowsCnt_; } - std::vector<TResultBatch::TPtr> Build() { + TResultBatch::TPtr Build() { std::vector<arrow::Datum> columns; columns.reserve(ColumnBuilders_.size()); for (size_t i = 0; i < ColumnBuilders_.size(); ++i) { columns.emplace_back(std::move(ColumnBuilders_[i]->Build(false))); } - std::vector<std::shared_ptr<TResultBatch>> blocks; - int64_t offset = 0; - std::vector<int64_t> currentChunk(columns.size()), inChunkOffset(columns.size()); - while (RowsCnt_) { - int64_t max_curr_len = RowsCnt_; - for (size_t i = 0; i < columns.size(); ++i) { - if (arrow::Datum::Kind::CHUNKED_ARRAY == columns[i].kind()) { - auto& c_arr = columns[i].chunked_array(); - while (currentChunk[i] < c_arr->num_chunks() && !c_arr->chunk(currentChunk[i])) { - ++currentChunk[i]; - } - YQL_ENSURE(currentChunk[i] < c_arr->num_chunks()); - max_curr_len = std::min(max_curr_len, c_arr->chunk(currentChunk[i])->length() - inChunkOffset[i]); - } - } - RowsCnt_ -= max_curr_len; - decltype(columns) result_columns; - result_columns.reserve(columns.size()); - offset += max_curr_len; - for (size_t i = 0; i < columns.size(); ++i) { - auto& e = columns[i]; - if (arrow::Datum::Kind::CHUNKED_ARRAY == e.kind()) { - result_columns.emplace_back(e.chunked_array()->chunk(currentChunk[i])->Slice(inChunkOffset[i], max_curr_len)); - if (max_curr_len + inChunkOffset[i] == e.chunked_array()->chunk(currentChunk[i])->length()) { - ++currentChunk[i]; - inChunkOffset[i] = 0; - } else { - inChunkOffset[i] += max_curr_len; - } - } else { - result_columns.emplace_back(e.array()->Slice(offset - max_curr_len, max_curr_len)); - } - } - blocks.emplace_back(std::make_shared<TResultBatch>(max_curr_len, std::move(result_columns))); - } - return blocks; + auto res = std::make_shared<TResultBatch>(RowsCnt_, std::move(columns)); + RowsCnt_ = 0; + return res; } private: @@ -480,9 +447,7 @@ public: } } if (payload) { - for (auto &e: FallbackHandler(inputIdx, payload)) { - Listener_->HandleFallback(std::move(e)); - } + Listener_->HandleFallback(FallbackHandler(inputIdx, payload)); InputDone(inputIdx); RunRead(); } @@ -494,7 +459,7 @@ public: } } - std::vector<TResultBatch::TPtr> FallbackHandler(size_t idx, NYT::TSharedRef payload) { + TResultBatch::TPtr FallbackHandler(size_t idx, NYT::TSharedRef payload) { if (!payload.Size()) { return {}; } |
