aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/tx/columnshard/columnshard__index_scan.cpp
blob: a2d4dc7bb809ce9a90eebb316fa1d0adc5ccc7dd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "columnshard__index_scan.h"
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/conveyor/usage/events.h>

namespace NKikimr::NColumnShard {

TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<NOlap::TReadContext>& context, const NOlap::TReadMetadata::TConstPtr& readMetadata)
    : Context(context)
    , ReadMetadata(readMetadata)
    , ReadyResults(context->GetCounters())
{
    IndexedData = readMetadata->BuildReader(Context);
    Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted());

    if (readMetadata->Empty()) {
        IndexedData->Abort();
    }
}

std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() {
    FillReadyResults();
    return ReadyResults.pop_front();
}

void TColumnShardScanIterator::PrepareResults() {
    FillReadyResults();
}

bool TColumnShardScanIterator::ReadNextInterval() {
    return IndexedData->ReadNextInterval();
}

void TColumnShardScanIterator::FillReadyResults() {
    auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
    i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit - ItemsRead;
    for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
        auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
        if (batch.GetResultBatch().num_rows() > limitLeft) {
            batch.Cut(limitLeft);
        }
        limitLeft -= batch.GetResultBatch().num_rows();
        ItemsRead += batch.GetResultBatch().num_rows();
    }

    if (limitLeft == 0) {
        AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead);
        IndexedData->Abort();
    }
}

TColumnShardScanIterator::~TColumnShardScanIterator() {
    IndexedData->Abort();
    ReadMetadata->ReadStats->PrintToLog();
}

void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
    if (!IndexedData->IsFinished()) {
        Y_ABORT_UNLESS(task->Apply(*IndexedData));
    }
}

}