diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-28 12:37:05 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-28 12:37:05 +0300 |
commit | c270c8ce912c36e9639202e4cafa1fa5261c9925 (patch) | |
tree | 2e9e0a3ebab3b557a4098ae397f1898935287678 | |
parent | a86f9ad8d083e89a2137769078aff95bd4c6827a (diff) | |
download | ydb-c270c8ce912c36e9639202e4cafa1fa5261c9925.tar.gz |
one request for many blobs
additional stat infos
100Mb for data in flight
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 182 |
3 files changed, 169 insertions, 29 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 6e38d8ecb03..25e28d14b8e 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -210,7 +210,7 @@ private: const bool promote = (i64)MaxCacheDataSize && ev->Get()->ReadOptions.CacheAfterRead; const bool fallback = ev->Get()->ReadOptions.ForceFallback; - LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback); + LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback << " sender:" << ev->Sender); TReadItem readItem(ev->Get()->ReadOptions, blobRange); HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); @@ -226,9 +226,10 @@ private: if (it != Cache.End()) { Hits->Inc(); HitsBytes->Add(blobRange.Size); - return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx); + return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true); } + LOG_S_DEBUG("Miss cache: " << blobRange << " sender:" << sender); Misses->Inc(); // Prevent full cache flushing by exported blobs. Decrease propability of caching depending on cache size. @@ -403,7 +404,7 @@ private: ReadsInQueue->Set(ReadQueue.size()); - // We might need to free some space to accomodate the results of new reads + // We might need to free some space to accommodate the results of new reads Evict(ctx); std::vector<ui64> tabletReads; @@ -463,10 +464,10 @@ private: } void SendResult(const TActorId& to, const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, - const TString& data, const TActorContext& ctx) { + const TString& data, const TActorContext& ctx, const bool fromCache = false) { LOG_S_DEBUG("Send result: " << blobRange << " to: " << to << " status: " << status); - ctx.Send(to, new TEvBlobCache::TEvReadBlobRangeResult(blobRange, status, data)); + ctx.Send(to, new TEvBlobCache::TEvReadBlobRangeResult(blobRange, status, data, fromCache)); } void Handle(TEvBlobStorage::TEvGetResult::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h index fb6ab9db2fd..05661dd7bf2 100644 --- a/ydb/core/tx/columnshard/blob_cache.h +++ b/ydb/core/tx/columnshard/blob_cache.h @@ -79,11 +79,14 @@ struct TEvBlobCache { TBlobRange BlobRange; NKikimrProto::EReplyStatus Status; TString Data; + const bool FromCache = false; + const TInstant ConstructTime = Now(); - TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data) + TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false) : BlobRange(blobRange) , Status(status) , Data(data) + , FromCache(fromCache) {} }; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8d3ccc4a36d..afe68d8ca47 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -41,7 +41,7 @@ private: constexpr ui64 INIT_BATCH_ROWS = 1000; -constexpr i64 DEFAULT_READ_AHEAD_BYTES = 1*1024*1024; +constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100*1024*1024; constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10); constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5); @@ -101,22 +101,36 @@ private: } bool ReadNextBlob() { - auto blobRange = ScanIterator->GetNextBlobToRead(); - if (!blobRange.BlobId.IsValid()) { - return false; + THashMap<TUnifiedBlobId, std::vector<NBlobCache::TBlobRange>> ranges; + while (InFlightReadBytes < MaxReadAheadBytes || !InFlightReads) { + auto blobRange = ScanIterator->GetNextBlobToRead(); + if (!blobRange.BlobId.IsValid()) { + break; + } + ++InFlightReads; + InFlightReadBytes += blobRange.Size; + ranges[blobRange.BlobId].emplace_back(blobRange); + } + if (ranges.size()) { + auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; + for (auto&& i : ranges) { + bool fallback = externBlobs && externBlobs->count(i.first); + NBlobCache::TReadBlobRangeOptions readOpts{ + .CacheAfterRead = true, + .ForceFallback = fallback, + .IsBackgroud = false + }; + ui32 size = 0; + for (auto&& s : i.second) { + size += s.Size; + } + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + Stats.RequestSent(i.second); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts))); + } } - - auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; - bool fallback = externBlobs && externBlobs->count(blobRange.BlobId); - - NBlobCache::TReadBlobRangeOptions readOpts { - .CacheAfterRead = true, - .ForceFallback = fallback, - .IsBackgroud = false - }; - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); - ++InFlightReads; - InFlightReadBytes += blobRange.Size; return true; } @@ -141,10 +155,14 @@ private: } void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " blobs response:" + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); --InFlightReads; auto& event = *ev->Get(); const auto& blobRange = event.BlobRange; + Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime); if (event.Status != NKikimrProto::EReplyStatus::OK) { TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status); @@ -176,6 +194,10 @@ private: // Returns true if it was able to produce new batch bool ProduceResults() { + auto g = Stats.MakeGuard("ProduceResults"); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " producing result: start" + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); Y_VERIFY(!Finished); Y_VERIFY(ScanIterator); @@ -237,7 +259,6 @@ private: case NKikimrTxDataShard::EScanDataFormat::ARROW: { MakeResult(0); Result->ArrowBatch = batch; - Rows += batch->num_rows(); Bytes += NArrow::GetBatchDataSize(batch); break; @@ -249,6 +270,9 @@ private: Y_VERIFY(numRows == 0, "Got non-empty result batch without last key"); } SendResult(false, false); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " producing result: finished" + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); return true; } @@ -282,14 +306,10 @@ private: NextReadMetadata(); } - size_t MIN_READY_RESULTS_IN_QUEUE = 3; + const size_t MIN_READY_RESULTS_IN_QUEUE = 3; if (ScanIterator && ScanIterator->ReadyResultsCount() < MIN_READY_RESULTS_IN_QUEUE) { // Make read-ahead requests for the subsequent blobs - while (InFlightReadBytes < MaxReadAheadBytes || !InFlightReads) { - if (!ReadNextBlob()) { - break; - } - } + ReadNextBlob(); } } @@ -445,6 +465,15 @@ private: } Finished = Result->Finished; + if (Finished) { + Stats.Finish(); + ALS_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN) << + "Scanner finished " << ScanActorId << " and sent to " << ComputeActorId + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId + << " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults + << " finished: " << Result->Finished << " pageFault: " << Result->PageFault + << " stats:" << Stats.DebugString(); + } Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ? ++InFlightScanDataMessages; @@ -530,6 +559,113 @@ private: i64 InFlightReadBytes = 0; i64 InFlightScanDataMessages = 0; bool Finished = false; + + class TBlobStats { + private: + ui64 PartsCount = 0; + ui64 Bytes = 0; + TDuration ReadingDurationSum; + TDuration ReadingDurationMax; + public: + void Received(const NBlobCache::TBlobRange& br, const TDuration d) { + ReadingDurationSum += d; + ReadingDurationMax = Max(ReadingDurationMax, d); + ++PartsCount; + Bytes += br.Size; + } + TString DebugString() const { + TStringBuilder sb; + if (PartsCount) { + sb << "p_count=" << PartsCount << ";"; + sb << "bytes=" << Bytes << ";"; + sb << "d_avg=" << ReadingDurationSum / PartsCount << ";"; + sb << "d_max=" << ReadingDurationMax << ";"; + } else { + sb << "NO_BLOBS;"; + } + return sb; + } + }; + + class TScanStats { + private: + THashMap<NBlobCache::TBlobRange, TInstant> StartBlobRequest; + const TInstant StartInstant = Now(); + TInstant FinishInstant = TInstant::Zero(); + ui32 RequestsCount = 0; + ui64 RequestedBytes = 0; + TBlobStats CacheBlobs; + TBlobStats MissBlobs; + THashMap<TString, TDuration> GuardedDurations; + public: + + TString DebugString() const { + TStringBuilder sb; + sb << "SCAN_STATS;"; + sb << "d=" << FinishInstant - StartInstant << ";"; + if (RequestsCount) { + sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};"; + sb << "cache:{" << CacheBlobs.DebugString() << "};"; + sb << "miss:{" << MissBlobs.DebugString() << "};"; + } else { + sb << "NO_REQUESTS;"; + } + for (auto&& i : GuardedDurations) { + sb << i.first << "=" << i.second << ";"; + } + return sb; + } + + class TGuard { + private: + TScanStats& Owner; + const TInstant Start = Now(); + TString SectionName; + public: + TGuard(const TString& sectionName, TScanStats& owner) + : Owner(owner) + , SectionName(sectionName) + { + + } + + ~TGuard() { + Owner.GuardedDurations[SectionName] += Now() - Start; + } + }; + + TGuard MakeGuard(const TString& sectionName) { + return TGuard(sectionName, *this); + } + + void RequestSent(const std::vector<NBlobCache::TBlobRange>& ranges) { + ++RequestsCount; + const TInstant now = Now(); + for (auto&& i : ranges) { + Y_VERIFY(StartBlobRequest.emplace(i, now).second); + RequestedBytes += i.Size; + } + } + + void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) { + auto it = StartBlobRequest.find(br); + Y_VERIFY(it != StartBlobRequest.end()); + const TDuration d = replyInstant - it->second; + if (fromCache) { + CacheBlobs.Received(br, d); + } else { + MissBlobs.Received(br, d); + } + StartBlobRequest.erase(it); + } + + void Finish() { + Y_VERIFY(!FinishInstant); + FinishInstant = Now(); + } + }; + + TScanStats Stats; ui64 Rows = 0; ui64 Bytes = 0; ui32 PageFaults = 0; |