aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-28 12:37:05 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-28 12:37:05 +0300
commitc270c8ce912c36e9639202e4cafa1fa5261c9925 (patch)
tree2e9e0a3ebab3b557a4098ae397f1898935287678
parenta86f9ad8d083e89a2137769078aff95bd4c6827a (diff)
downloadydb-c270c8ce912c36e9639202e4cafa1fa5261c9925.tar.gz
one request for many blobs
additional stat infos 100Mb for data in flight
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp11
-rw-r--r--ydb/core/tx/columnshard/blob_cache.h5
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp182
3 files changed, 169 insertions, 29 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index 6e38d8ecb03..25e28d14b8e 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -210,7 +210,7 @@ private:
const bool promote = (i64)MaxCacheDataSize && ev->Get()->ReadOptions.CacheAfterRead;
const bool fallback = ev->Get()->ReadOptions.ForceFallback;
- LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback);
+ LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback << " sender:" << ev->Sender);
TReadItem readItem(ev->Get()->ReadOptions, blobRange);
HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
@@ -226,9 +226,10 @@ private:
if (it != Cache.End()) {
Hits->Inc();
HitsBytes->Add(blobRange.Size);
- return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx);
+ return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true);
}
+ LOG_S_DEBUG("Miss cache: " << blobRange << " sender:" << sender);
Misses->Inc();
// Prevent full cache flushing by exported blobs. Decrease propability of caching depending on cache size.
@@ -403,7 +404,7 @@ private:
ReadsInQueue->Set(ReadQueue.size());
- // We might need to free some space to accomodate the results of new reads
+ // We might need to free some space to accommodate the results of new reads
Evict(ctx);
std::vector<ui64> tabletReads;
@@ -463,10 +464,10 @@ private:
}
void SendResult(const TActorId& to, const TBlobRange& blobRange, NKikimrProto::EReplyStatus status,
- const TString& data, const TActorContext& ctx) {
+ const TString& data, const TActorContext& ctx, const bool fromCache = false) {
LOG_S_DEBUG("Send result: " << blobRange << " to: " << to << " status: " << status);
- ctx.Send(to, new TEvBlobCache::TEvReadBlobRangeResult(blobRange, status, data));
+ ctx.Send(to, new TEvBlobCache::TEvReadBlobRangeResult(blobRange, status, data, fromCache));
}
void Handle(TEvBlobStorage::TEvGetResult::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h
index fb6ab9db2fd..05661dd7bf2 100644
--- a/ydb/core/tx/columnshard/blob_cache.h
+++ b/ydb/core/tx/columnshard/blob_cache.h
@@ -79,11 +79,14 @@ struct TEvBlobCache {
TBlobRange BlobRange;
NKikimrProto::EReplyStatus Status;
TString Data;
+ const bool FromCache = false;
+ const TInstant ConstructTime = Now();
- TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data)
+ TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false)
: BlobRange(blobRange)
, Status(status)
, Data(data)
+ , FromCache(fromCache)
{}
};
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8d3ccc4a36d..afe68d8ca47 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -41,7 +41,7 @@ private:
constexpr ui64 INIT_BATCH_ROWS = 1000;
-constexpr i64 DEFAULT_READ_AHEAD_BYTES = 1*1024*1024;
+constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100*1024*1024;
constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10);
constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5);
@@ -101,22 +101,36 @@ private:
}
bool ReadNextBlob() {
- auto blobRange = ScanIterator->GetNextBlobToRead();
- if (!blobRange.BlobId.IsValid()) {
- return false;
+ THashMap<TUnifiedBlobId, std::vector<NBlobCache::TBlobRange>> ranges;
+ while (InFlightReadBytes < MaxReadAheadBytes || !InFlightReads) {
+ auto blobRange = ScanIterator->GetNextBlobToRead();
+ if (!blobRange.BlobId.IsValid()) {
+ break;
+ }
+ ++InFlightReads;
+ InFlightReadBytes += blobRange.Size;
+ ranges[blobRange.BlobId].emplace_back(blobRange);
+ }
+ if (ranges.size()) {
+ auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
+ for (auto&& i : ranges) {
+ bool fallback = externBlobs && externBlobs->count(i.first);
+ NBlobCache::TReadBlobRangeOptions readOpts{
+ .CacheAfterRead = true,
+ .ForceFallback = fallback,
+ .IsBackgroud = false
+ };
+ ui32 size = 0;
+ for (auto&& s : i.second) {
+ size += s.Size;
+ }
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ Stats.RequestSent(i.second);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts)));
+ }
}
-
- auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
- bool fallback = externBlobs && externBlobs->count(blobRange.BlobId);
-
- NBlobCache::TReadBlobRangeOptions readOpts {
- .CacheAfterRead = true,
- .ForceFallback = fallback,
- .IsBackgroud = false
- };
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
- ++InFlightReads;
- InFlightReadBytes += blobRange.Size;
return true;
}
@@ -141,10 +155,14 @@ private:
}
void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " blobs response:"
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
--InFlightReads;
auto& event = *ev->Get();
const auto& blobRange = event.BlobRange;
+ Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime);
if (event.Status != NKikimrProto::EReplyStatus::OK) {
TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status);
@@ -176,6 +194,10 @@ private:
// Returns true if it was able to produce new batch
bool ProduceResults() {
+ auto g = Stats.MakeGuard("ProduceResults");
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " producing result: start"
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
Y_VERIFY(!Finished);
Y_VERIFY(ScanIterator);
@@ -237,7 +259,6 @@ private:
case NKikimrTxDataShard::EScanDataFormat::ARROW: {
MakeResult(0);
Result->ArrowBatch = batch;
-
Rows += batch->num_rows();
Bytes += NArrow::GetBatchDataSize(batch);
break;
@@ -249,6 +270,9 @@ private:
Y_VERIFY(numRows == 0, "Got non-empty result batch without last key");
}
SendResult(false, false);
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " producing result: finished"
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
return true;
}
@@ -282,14 +306,10 @@ private:
NextReadMetadata();
}
- size_t MIN_READY_RESULTS_IN_QUEUE = 3;
+ const size_t MIN_READY_RESULTS_IN_QUEUE = 3;
if (ScanIterator && ScanIterator->ReadyResultsCount() < MIN_READY_RESULTS_IN_QUEUE) {
// Make read-ahead requests for the subsequent blobs
- while (InFlightReadBytes < MaxReadAheadBytes || !InFlightReads) {
- if (!ReadNextBlob()) {
- break;
- }
- }
+ ReadNextBlob();
}
}
@@ -445,6 +465,15 @@ private:
}
Finished = Result->Finished;
+ if (Finished) {
+ Stats.Finish();
+ ALS_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN) <<
+ "Scanner finished " << ScanActorId << " and sent to " << ComputeActorId
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
+ << " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults
+ << " finished: " << Result->Finished << " pageFault: " << Result->PageFault
+ << " stats:" << Stats.DebugString();
+ }
Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?
++InFlightScanDataMessages;
@@ -530,6 +559,113 @@ private:
i64 InFlightReadBytes = 0;
i64 InFlightScanDataMessages = 0;
bool Finished = false;
+
+ class TBlobStats {
+ private:
+ ui64 PartsCount = 0;
+ ui64 Bytes = 0;
+ TDuration ReadingDurationSum;
+ TDuration ReadingDurationMax;
+ public:
+ void Received(const NBlobCache::TBlobRange& br, const TDuration d) {
+ ReadingDurationSum += d;
+ ReadingDurationMax = Max(ReadingDurationMax, d);
+ ++PartsCount;
+ Bytes += br.Size;
+ }
+ TString DebugString() const {
+ TStringBuilder sb;
+ if (PartsCount) {
+ sb << "p_count=" << PartsCount << ";";
+ sb << "bytes=" << Bytes << ";";
+ sb << "d_avg=" << ReadingDurationSum / PartsCount << ";";
+ sb << "d_max=" << ReadingDurationMax << ";";
+ } else {
+ sb << "NO_BLOBS;";
+ }
+ return sb;
+ }
+ };
+
+ class TScanStats {
+ private:
+ THashMap<NBlobCache::TBlobRange, TInstant> StartBlobRequest;
+ const TInstant StartInstant = Now();
+ TInstant FinishInstant = TInstant::Zero();
+ ui32 RequestsCount = 0;
+ ui64 RequestedBytes = 0;
+ TBlobStats CacheBlobs;
+ TBlobStats MissBlobs;
+ THashMap<TString, TDuration> GuardedDurations;
+ public:
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "SCAN_STATS;";
+ sb << "d=" << FinishInstant - StartInstant << ";";
+ if (RequestsCount) {
+ sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};";
+ sb << "cache:{" << CacheBlobs.DebugString() << "};";
+ sb << "miss:{" << MissBlobs.DebugString() << "};";
+ } else {
+ sb << "NO_REQUESTS;";
+ }
+ for (auto&& i : GuardedDurations) {
+ sb << i.first << "=" << i.second << ";";
+ }
+ return sb;
+ }
+
+ class TGuard {
+ private:
+ TScanStats& Owner;
+ const TInstant Start = Now();
+ TString SectionName;
+ public:
+ TGuard(const TString& sectionName, TScanStats& owner)
+ : Owner(owner)
+ , SectionName(sectionName)
+ {
+
+ }
+
+ ~TGuard() {
+ Owner.GuardedDurations[SectionName] += Now() - Start;
+ }
+ };
+
+ TGuard MakeGuard(const TString& sectionName) {
+ return TGuard(sectionName, *this);
+ }
+
+ void RequestSent(const std::vector<NBlobCache::TBlobRange>& ranges) {
+ ++RequestsCount;
+ const TInstant now = Now();
+ for (auto&& i : ranges) {
+ Y_VERIFY(StartBlobRequest.emplace(i, now).second);
+ RequestedBytes += i.Size;
+ }
+ }
+
+ void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) {
+ auto it = StartBlobRequest.find(br);
+ Y_VERIFY(it != StartBlobRequest.end());
+ const TDuration d = replyInstant - it->second;
+ if (fromCache) {
+ CacheBlobs.Received(br, d);
+ } else {
+ MissBlobs.Received(br, d);
+ }
+ StartBlobRequest.erase(it);
+ }
+
+ void Finish() {
+ Y_VERIFY(!FinishInstant);
+ FinishInstant = Now();
+ }
+ };
+
+ TScanStats Stats;
ui64 Rows = 0;
ui64 Bytes = 0;
ui32 PageFaults = 0;