diff options
author | niksaveliev <nik@saveliev.me> | 2024-02-09 16:51:48 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-09 16:51:48 +0600 |
commit | db527f78afca301a51755a23f4063841355dc333 (patch) | |
tree | c7b63835c7e6833620aae355d830477091075862 | |
parent | bb72994139ec070703dad699862e28099476596a (diff) | |
download | ydb-db527f78afca301a51755a23f4063841355dc333.tar.gz |
Fix pq writer and few renames (#1757)
-rw-r--r-- | ydb/core/persqueue/writer/writer.cpp | 65 |
1 files changed, 23 insertions, 42 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 5fac083b6c..8fec66d14a 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -470,7 +470,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return; } - const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired(); + const bool needToRequestQuota = Opts.CheckRequestUnits() && IsQuotaRequired(); size_t processed = 0; PendingQuotaAmount = 0; @@ -490,7 +490,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl cmd.SetSize(it->second.ByteSize()); cmd.SetLastRequest(false); - if (checkQuota) { + if (needToRequestQuota) { ++processed; PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()); PendingQuota.emplace_back(it->first); @@ -498,15 +498,15 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); - PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota }); + PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), needToRequestQuota }); Pending.erase(it); - if (checkQuota && processed == MAX_QUOTA_INFLIGHT) { + if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) { break; } } - if (checkQuota) { + if (needToRequestQuota) { RequestDataQuota(PendingQuotaAmount, ctx); } } @@ -527,18 +527,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl ReceivedReserve.emplace(it->first, std::move(it->second)); - ProcessQuota(); + ProcessQuotaAndWrite(); } - void ProcessQuota() { + void ProcessQuotaAndWrite() { auto rit = ReceivedReserve.begin(); auto qit = ReceivedQuota.begin(); while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) { auto& request = rit->second; const auto cookie = rit->first; - TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted); - if (!request.QuotaChecked || request.QuotaAccepted) { + TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted); + if (!request.QuotaCheckEnabled || request.QuotaAccepted) { // A situation when a quota was not requested or was received while waiting for a reserve Write(cookie, std::move(request.Request)); ReceivedReserve.erase(rit++); @@ -559,8 +559,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl while(rit != ReceivedReserve.end()) { auto& request = rit->second; const auto cookie = rit->first; - TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted); - if (request.QuotaChecked && !request.QuotaAccepted) { + TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted); + if (request.QuotaCheckEnabled && !request.QuotaAccepted) { break; } @@ -587,27 +587,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl ReceivedQuota.clear(); } - void Write(ui64 cookie) { - if (PendingReserve.empty()) { - ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02"); - Disconnected(EErrorCode::InternalError); - return; - } - auto it = PendingReserve.begin(); - - auto cookieReserveValid = (it->first == cookie); - auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie); - if (!(cookieReserveValid && cookieWriteValid)) { - ERROR("The cookie of Write is invalid. Cookie=" << cookie); - Disconnected(EErrorCode::InternalError); - return; - } - - Write(cookie, std::move(it->second.Request)); - - PendingReserve.erase(it); - } - void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) { auto ev = MakeHolder<TEvPersQueue::TEvRequest>(); ev->Record = std::move(req); @@ -651,24 +630,26 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return WriteResult(EErrorCode::InternalError, error, std::move(record)); } - WriteAccepted(cookie); - - if (PendingReserve.empty()) { - ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03"); + auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie); + if (!cookieWriteValid) { + ERROR("The cookie of Write is invalid. Cookie=" << cookie); Disconnected(EErrorCode::InternalError); return; } + + WriteAccepted(cookie); auto it = PendingReserve.begin(); auto& holder = it->second; - if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) { + if ((holder.QuotaCheckEnabled && !holder.QuotaAccepted) || !ReceivedReserve.empty()) { // There may be two situations: // - a quota has been requested, and the quota has not been received yet // - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed EnqueueReservedAndProcess(cookie); } else { - Write(cookie); + Write(cookie, std::move(it->second.Request)); } + PendingReserve.erase(it); } else { if (PendingWrite.empty()) { return WriteResult(EErrorCode::InternalError, "Unexpected Write response", std::move(record)); @@ -740,7 +721,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end()); PendingQuota.clear(); - ProcessQuota(); + ProcessQuotaAndWrite(); break; @@ -829,12 +810,12 @@ private: struct RequestHolder { NKikimrClient::TPersQueueRequest Request; - bool QuotaChecked; + bool QuotaCheckEnabled; bool QuotaAccepted; - RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked) + RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaCheckEnabled) : Request(std::move(request)) - , QuotaChecked(quotaChecked) + , QuotaCheckEnabled(quotaCheckEnabled) , QuotaAccepted(false) { } }; |