aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 17:48:32 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 17:48:32 +0300
commit6b6fa4bd8c71c7b69c40f2a9c7cee8b77387d4e8 (patch)
tree3a54ee4ba82033d0e3fcf253b05dab04c09d19a2
parentebd5d895f93c2ae9bfaf21f84fb0a9e623f4b104 (diff)
downloadydb-6b6fa4bd8c71c7b69c40f2a9c7cee8b77387d4e8.tar.gz
incapsulate zero granule fetching logic into reader
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp36
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h6
-rw-r--r--ydb/core/tx/columnshard/counters/common/object_counter.h4
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h46
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h6
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp77
7 files changed, 85 insertions, 133 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index fd7e8d47815..ccbb1217c75 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -9,27 +9,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
, ReadMetadata(readMetadata)
, IndexedData(ReadMetadata, false, context)
{
- ui32 batchNo = 0;
- for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) {
- const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
- WaitCommitted.emplace(cmtBlob, batchNo);
- }
- IndexedData.InitRead(batchNo);
- // Add cached batches without read
- for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
- Y_VERIFY(!cmt.empty());
-
- const NOlap::TCommittedBlob& cmtBlob = cmt.key();
- ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob);
- }
- // Read all remained committed blobs
- for (const auto& [cmtBlob, _] : WaitCommitted) {
- auto& blobId = cmtBlob.GetBlobId();
- IndexedData.AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize()));
- }
-
+ IndexedData.InitRead();
Y_VERIFY(ReadMetadata->IsSorted());
if (ReadMetadata->Empty()) {
@@ -38,16 +18,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
}
void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) {
- const auto& blobId = blobRange.BlobId;
- if (IndexedData.IsIndexedBlob(blobRange)) {
- IndexedData.AddIndexed(blobRange, data);
- } else {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
- Y_VERIFY(!cmt.empty());
- const NOlap::TCommittedBlob& cmtBlob = cmt.key();
- ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, data, cmtBlob);
- }
+ IndexedData.AddData(blobRange, data);
}
NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
@@ -87,11 +58,10 @@ void TColumnShardScanIterator::FillReadyResults() {
}
if (limitLeft == 0) {
- WaitCommitted.clear();
IndexedData.Abort();
}
- if (WaitCommitted.empty() && IndexedData.IsFinished()) {
+ if (IndexedData.IsFinished()) {
Context.MutableProcessor().Stop();
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index c41ecc88507..4484ee85744 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -44,6 +44,12 @@ public:
return ReadyResults.size();
}
+ virtual TString DebugString() const override {
+ return TStringBuilder()
+ << "indexed_data:(" << IndexedData.DebugString() << ")"
+ ;
+ }
+
virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override;
virtual bool HasWaitingTasks() const override;
diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.h b/ydb/core/tx/columnshard/counters/common/object_counter.h
index b3cea6c284d..2cef57b7a11 100644
--- a/ydb/core/tx/columnshard/counters/common/object_counter.h
+++ b/ydb/core/tx/columnshard/counters/common/object_counter.h
@@ -44,7 +44,7 @@ public:
}
Counter.Inc();
if (UseLogs) {
- AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "create")("object_type", TypeName<TObject>())("count", Counter.Val());
+ ACFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "create")("object_type", TypeName<TObject>())("count", Counter.Val());
}
}
~TMonitoringObjectsCounter() {
@@ -53,7 +53,7 @@ public:
}
Counter.Dec();
if (UseLogs) {
- AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "destroy")("object_type", TypeName<TObject>())("count", Counter.Val());
+ ACFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "destroy")("object_type", TypeName<TObject>())("count", Counter.Val());
}
}
};
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 9ebde609884..ab1f584a0e3 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -104,7 +104,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
- Y_VERIFY(IndexedBlobs.emplace(range).second);
Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
if (batch.GetFetchedInfo().GetFilter()) {
Context.GetCounters().PostFilterBytes->Add(range.Size);
@@ -117,15 +116,28 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
}
}
-void TIndexedReadData::InitRead(ui32 inputBatch) {
+void TIndexedReadData::RegisterZeroGranula() {
+ for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i) {
+ const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
+ WaitCommitted.emplace(cmtBlob.GetBlobId(), cmtBlob);
+ }
+ for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
+ AddNotIndexed(blobId, batch);
+ }
+ for (const auto& [blobId, _] : WaitCommitted) {
+ AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize()));
+ }
+}
+
+void TIndexedReadData::InitRead() {
+ RegisterZeroGranula();
+
auto& indexInfo = ReadMetadata->GetIndexInfo();
Y_VERIFY(indexInfo.GetSortingKey());
Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields());
SortReplaceDescription = indexInfo.SortReplaceDescription();
- NotIndexed.resize(inputBatch);
-
Y_VERIFY(!GranulesContext);
GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
ui64 portionsBytes = 0;
@@ -161,10 +173,7 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
NIndexedReader::TBatch* portionBatch = nullptr;
{
auto it = IndexedBlobSubscriber.find(blobRange);
- Y_VERIFY_DEBUG(it != IndexedBlobSubscriber.end());
- if (it == IndexedBlobSubscriber.end()) {
- return;
- }
+ Y_VERIFY(it != IndexedBlobSubscriber.end());
portionBatch = it->second;
IndexedBlobSubscriber.erase(it);
}
@@ -203,7 +212,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t
Y_VERIFY(SortReplaceDescription);
auto& indexInfo = ReadMetadata->GetIndexInfo();
- if (NotIndexed.size() != ReadyNotIndexed) {
+ if (WaitCommitted.size()) {
// Wait till we have all not indexed data so we could replace keys in granules
return {};
}
@@ -235,7 +244,6 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t
GranulesContext->DrainNotIndexedBatches(nullptr);
}
NotIndexed.clear();
- ReadyNotIndexed = 0;
} else {
GranulesContext->DrainNotIndexedBatches(nullptr);
}
@@ -480,4 +488,19 @@ NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() {
}
}
+void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) {
+ auto it = WaitCommitted.find(blobRange.BlobId);
+ Y_VERIFY(it != WaitCommitted.end());
+ auto batch = NArrow::DeserializeBatch(column, ReadMetadata->GetBlobSchema(it->second.GetSchemaSnapshot()));
+ NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot()));
+ WaitCommitted.erase(it);
+}
+
+void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch) {
+ auto it = WaitCommitted.find(blobId);
+ Y_VERIFY(it != WaitCommitted.end());
+ NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot()));
+ WaitCommitted.erase(it);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 527b7d8a0cf..3638ebd0d7f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -21,6 +21,7 @@ class TIndexedReadData {
private:
TReadContext Context;
std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
+ THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted;
TFetchBlobsQueue FetchBlobsQueue;
TFetchBlobsQueue PriorityBlobsQueue;
@@ -28,15 +29,29 @@ private:
bool OnePhaseReadMode = false;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
- THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
- THashSet<TBlobRange> IndexedBlobs;
- ui32 ReadyNotIndexed{ 0 };
- std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; // outscope granules batch
+ THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber;
+ std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
+ void AddNotIndexed(const TBlobRange& blobRange, const TString& column);
+ void AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch);
+ void RegisterZeroGranula();
+
+ void AddIndexed(const TBlobRange& blobRange, const TString& column);
+ bool IsIndexedBlob(const TBlobRange& blobRange) const {
+ return IndexedBlobSubscriber.contains(blobRange);
+ }
public:
TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const bool internalRead, const TReadContext& context);
+ TString DebugString() const {
+ return TStringBuilder()
+ << "internal:" << OnePhaseReadMode << ";"
+ << "wait_committed:" << WaitCommitted.size() << ";"
+ << "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");"
+ ;
+ }
+
const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
return Context.GetCounters();
}
@@ -50,30 +65,21 @@ public:
return *GranulesContext;
}
- /// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently
- void InitRead(ui32 numNotIndexed);
+ void InitRead();
void Abort();
bool IsFinished() const;
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
- void AddNotIndexed(ui32 batchNo, TString blob, const TCommittedBlob& commitedBlob) {
- auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(commitedBlob.GetSchemaSnapshot()));
- AddNotIndexed(batchNo, batch, commitedBlob);
- }
-
- void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TCommittedBlob& commitedBlob) {
- Y_VERIFY(batchNo < NotIndexed.size());
- Y_VERIFY(!NotIndexed[batchNo]);
- ++ReadyNotIndexed;
- NotIndexed[batchNo] = MakeNotIndexedBatch(batch, commitedBlob.GetSchemaSnapshot());
+ void AddData(const TBlobRange& blobRange, const TString& data) {
+ if (IsIndexedBlob(blobRange)) {
+ AddIndexed(blobRange, data);
+ } else {
+ AddNotIndexed(blobRange, data);
+ }
}
- void AddIndexed(const TBlobRange& blobRange, const TString& column);
- bool IsIndexedBlob(const TBlobRange& blobRange) const {
- return IndexedBlobs.contains(blobRange);
- }
NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
return ReadMetadata;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 571f88f1698..64f5f60fed6 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -37,6 +37,12 @@ public:
bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range);
TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading);
+ TString DebugString() const {
+ return TStringBuilder()
+ << "sorting_policy:(" << SortingPolicy->DebugString() << ");"
+ ;
+ }
+
void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept {
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 6d54d0618d2..5c90910b8b4 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -14,7 +14,7 @@ private:
size_t next = 1;
for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
- bool lastOne = Finished() && (next == ready.size());
+ const bool lastOne = IndexedData.IsFinished() && (next == ready.size());
SendResult(ctx, it->ResultBatch, lastOne);
}
}
@@ -58,25 +58,7 @@ public:
Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size);
- if (IndexedBlobs.contains(event.BlobRange)) {
- if (!WaitIndexed.contains(event.BlobRange)) {
- return; // ignore duplicate parts
- }
- WaitIndexed.erase(event.BlobRange);
- IndexedData.AddIndexed(event.BlobRange, event.Data);
- } else if (CommittedBlobs.contains(blobId)) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
- if (cmt.empty()) {
- return; // ignore duplicates
- }
- const NOlap::TCommittedBlob& cmtBlob = cmt.key();
- ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob);
- } else {
- LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet "
- << TabletId << " (read)");
- return;
- }
+ IndexedData.AddData(event.BlobRange, event.Data);
BuildResult(ctx);
DieFinished(ctx);
@@ -93,9 +75,7 @@ public:
void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) {
Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS);
SendResult(ctx, {}, true, status);
-
- WaitIndexed.clear();
- WaitCommitted.clear();
+ IndexedData.Abort();
}
void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false,
@@ -151,12 +131,8 @@ public:
ctx.Send(DstActor, chunkEvent.release());
}
- bool Finished() const {
- return WaitCommitted.empty() && WaitIndexed.empty();
- }
-
void DieFinished(const TActorContext& ctx) {
- if (Finished()) {
+ if (IndexedData.IsFinished()) {
LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId);
Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie));
Die(ctx);
@@ -164,34 +140,9 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- ui32 notIndexed = 0;
- for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) {
- const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
-
- CommittedBlobs.emplace(cmtBlob.GetBlobId());
- WaitCommitted.emplace(cmtBlob, notIndexed);
- }
-
- IndexedData.InitRead(notIndexed);
- while (IndexedData.HasMoreBlobs()) {
- const auto blobRange = IndexedData.ExtractNextBlob();
- WaitIndexed.insert(blobRange);
- IndexedBlobs.emplace(blobRange);
- }
-
- // Add cached batches without read
- for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
- Y_VERIFY(!cmt.empty());
+ IndexedData.InitRead();
- const NOlap::TCommittedBlob& cmtBlob = cmt.key();
- ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob);
- }
-
- LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, "
- << ReadMetadata->CommittedBlobs.size() << " committed, "
- << WaitCommitted.size() << " not cached committed) at tablet " << TabletId);
+ LOG_S_DEBUG("Starting read (" << IndexedData.DebugString() << ") at tablet " << TabletId);
bool earlyExit = false;
if (Deadline != TInstant::Max()) {
@@ -208,17 +159,11 @@ public:
SendTimeouts(ctx);
ctx.Send(SelfId(), new TEvents::TEvPoisonPill());
} else {
- // TODO: Keep inflight
- for (auto& [cmtBlob, batchNo] : WaitCommitted) {
- auto& blobId = cmtBlob.GetBlobId();
- SendReadRequest(ctx, NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()));
- }
- for (auto&& blobRange : IndexedBlobs) {
+ while (IndexedData.HasMoreBlobs()) {
+ const auto blobRange = IndexedData.ExtractNextBlob();
SendReadRequest(ctx, blobRange);
}
- if (WaitCommitted.empty() && IndexedBlobs.empty()) {
- BuildResult(ctx);
- }
+ BuildResult(ctx);
}
Become(&TThis::StateWait);
@@ -262,10 +207,6 @@ private:
TInstant Deadline;
TActorId ColumnShardActorId;
const ui64 RequestCookie;
- THashSet<NBlobCache::TBlobRange> IndexedBlobs;
- THashSet<TUnifiedBlobId> CommittedBlobs;
- THashSet<NBlobCache::TBlobRange> WaitIndexed;
- std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted;
ui32 ReturnedBatchNo;
mutable TString SerializedSchema;