blob: a2d4dc7bb809ce9a90eebb316fa1d0adc5ccc7dd (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
#include "columnshard__index_scan.h"
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/conveyor/usage/events.h>
namespace NKikimr::NColumnShard {
TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<NOlap::TReadContext>& context, const NOlap::TReadMetadata::TConstPtr& readMetadata)
: Context(context)
, ReadMetadata(readMetadata)
, ReadyResults(context->GetCounters())
{
IndexedData = readMetadata->BuildReader(Context);
Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted());
if (readMetadata->Empty()) {
IndexedData->Abort();
}
}
std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() {
FillReadyResults();
return ReadyResults.pop_front();
}
void TColumnShardScanIterator::PrepareResults() {
FillReadyResults();
}
bool TColumnShardScanIterator::ReadNextInterval() {
return IndexedData->ReadNextInterval();
}
void TColumnShardScanIterator::FillReadyResults() {
auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
if (batch.GetResultBatch().num_rows() > limitLeft) {
batch.Cut(limitLeft);
}
limitLeft -= batch.GetResultBatch().num_rows();
ItemsRead += batch.GetResultBatch().num_rows();
}
if (limitLeft == 0) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead);
IndexedData->Abort();
}
}
TColumnShardScanIterator::~TColumnShardScanIterator() {
IndexedData->Abort();
ReadMetadata->ReadStats->PrintToLog();
}
void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
if (!IndexedData->IsFinished()) {
Y_ABORT_UNLESS(task->Apply(*IndexedData));
}
}
}
|