1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
#pragma once
#include "columnshard__scan.h"
#include "columnshard_common.h"
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
namespace NKikimr::NColumnShard {
class TIndexColumnResolver : public IColumnResolver {
const NOlap::TIndexInfo& IndexInfo;
public:
explicit TIndexColumnResolver(const NOlap::TIndexInfo& indexInfo)
: IndexInfo(indexInfo)
{}
TString GetColumnName(ui32 id, bool required) const override {
return IndexInfo.GetColumnName(id, required);
}
};
using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
class TColumnShardScanIterator : public TScanIteratorBase {
NOlap::TReadMetadata::TConstPtr ReadMetadata;
NOlap::TIndexedReadData IndexedData;
THashMap<TBlobRange, ui64> IndexedBlobs; // blobId -> granule
THashSet<TBlobRange> WaitIndexed;
THashMap<ui64, THashSet<TBlobRange>> GranuleBlobs; // granule -> blobs
THashMap<TUnifiedBlobId, ui32> WaitCommitted;
TVector<TBlobRange> BlobsToRead;
ui64 NextBlobIdxToRead = 0;
TDeque<NOlap::TPartialReadResult> ReadyResults;
bool IsReadFinished = false;
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
public:
TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata)
: ReadMetadata(readMetadata)
, IndexedData(ReadMetadata)
{
ui32 batchNo = 0;
for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) {
const TUnifiedBlobId& blobId = ReadMetadata->CommittedBlobs[i];
WaitCommitted.emplace(blobId, batchNo);
}
IndexedBlobs = IndexedData.InitRead(batchNo, true);
for (auto& [blobId, granule] : IndexedBlobs) {
WaitIndexed.insert(blobId);
GranuleBlobs[granule].insert(blobId);
}
// Read all committed blobs
for (const auto& blobId : ReadMetadata->CommittedBlobs) {
BlobsToRead.push_back(TBlobRange(blobId, 0, blobId.BlobSize()));
}
Y_VERIFY(ReadMetadata->IsSorted());
// Read all indexed blobs (in correct order)
auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
for (ui64 granule : granulesOrder) {
auto& blobs = GranuleBlobs[granule];
BlobsToRead.insert(BlobsToRead.end(), blobs.begin(), blobs.end());
}
IsReadFinished = ReadMetadata->Empty();
}
void AddData(const TBlobRange& blobRange, TString data) override {
const auto& blobId = blobRange.BlobId;
if (IndexedBlobs.count(blobRange)) {
if (!WaitIndexed.count(blobRange)) {
return; // ignore duplicate parts
}
WaitIndexed.erase(blobRange);
IndexedData.AddIndexedColumn(blobRange, data);
} else if (WaitCommitted.count(blobId)) {
ui32 batchNo = WaitCommitted[blobId];
WaitCommitted.erase(blobId);
IndexedData.AddNotIndexed(batchNo, data);
}
}
bool Finished() const override {
return IsReadFinished && ReadyResults.empty();
}
NOlap::TPartialReadResult GetBatch() override {
FillReadyResults();
if (ReadyResults.empty()) {
return {};
}
auto result(std::move(ReadyResults.front()));
ReadyResults.pop_front();
return result;
}
TBlobRange GetNextBlobToRead() override {
if (IsReadFinished || NextBlobIdxToRead == BlobsToRead.size()) {
return TBlobRange();
}
const auto& blob = BlobsToRead[NextBlobIdxToRead];
++NextBlobIdxToRead;
return blob;
}
size_t ReadyResultsCount() const override {
return ReadyResults.size();
}
private:
void FillReadyResults() {
auto ready = IndexedData.GetReadyResults(MaxRowsInBatch);
i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
if (ready[i].ResultBatch->num_rows() == 0 && !ready[i].LastReadKey) {
Y_VERIFY(i+1 == ready.size(), "Only last batch can be empty!");
break;
}
ReadyResults.emplace_back(std::move(ready[i]));
auto& batch = ReadyResults.back();
if (batch.ResultBatch->num_rows() > limitLeft) {
// Trim the last batch if total row count execceds the requested limit
batch.ResultBatch = batch.ResultBatch->Slice(0, limitLeft);
ready.clear();
}
limitLeft -= batch.ResultBatch->num_rows();
ItemsRead += batch.ResultBatch->num_rows();
}
if (limitLeft == 0) {
WaitCommitted.clear();
WaitIndexed.clear();
IsReadFinished = true;
}
if (WaitCommitted.empty() && WaitIndexed.empty() && NextBlobIdxToRead == BlobsToRead.size()) {
IsReadFinished = true;
}
}
};
}
|