aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorniksaveliev <nik@saveliev.me>2024-02-09 16:51:48 +0600
committerGitHub <noreply@github.com>2024-02-09 16:51:48 +0600
commitdb527f78afca301a51755a23f4063841355dc333 (patch)
treec7b63835c7e6833620aae355d830477091075862
parentbb72994139ec070703dad699862e28099476596a (diff)
downloadydb-db527f78afca301a51755a23f4063841355dc333.tar.gz
Fix pq writer and few renames (#1757)
-rw-r--r--ydb/core/persqueue/writer/writer.cpp65
1 files changed, 23 insertions, 42 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 5fac083b6c..8fec66d14a 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -470,7 +470,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return;
}
- const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
+ const bool needToRequestQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
size_t processed = 0;
PendingQuotaAmount = 0;
@@ -490,7 +490,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
cmd.SetSize(it->second.ByteSize());
cmd.SetLastRequest(false);
- if (checkQuota) {
+ if (needToRequestQuota) {
++processed;
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
PendingQuota.emplace_back(it->first);
@@ -498,15 +498,15 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
- PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota });
+ PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), needToRequestQuota });
Pending.erase(it);
- if (checkQuota && processed == MAX_QUOTA_INFLIGHT) {
+ if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) {
break;
}
}
- if (checkQuota) {
+ if (needToRequestQuota) {
RequestDataQuota(PendingQuotaAmount, ctx);
}
}
@@ -527,18 +527,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
ReceivedReserve.emplace(it->first, std::move(it->second));
- ProcessQuota();
+ ProcessQuotaAndWrite();
}
- void ProcessQuota() {
+ void ProcessQuotaAndWrite() {
auto rit = ReceivedReserve.begin();
auto qit = ReceivedQuota.begin();
while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) {
auto& request = rit->second;
const auto cookie = rit->first;
- TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
- if (!request.QuotaChecked || request.QuotaAccepted) {
+ TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
+ if (!request.QuotaCheckEnabled || request.QuotaAccepted) {
// A situation when a quota was not requested or was received while waiting for a reserve
Write(cookie, std::move(request.Request));
ReceivedReserve.erase(rit++);
@@ -559,8 +559,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
while(rit != ReceivedReserve.end()) {
auto& request = rit->second;
const auto cookie = rit->first;
- TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
- if (request.QuotaChecked && !request.QuotaAccepted) {
+ TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
+ if (request.QuotaCheckEnabled && !request.QuotaAccepted) {
break;
}
@@ -587,27 +587,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
ReceivedQuota.clear();
}
- void Write(ui64 cookie) {
- if (PendingReserve.empty()) {
- ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02");
- Disconnected(EErrorCode::InternalError);
- return;
- }
- auto it = PendingReserve.begin();
-
- auto cookieReserveValid = (it->first == cookie);
- auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
- if (!(cookieReserveValid && cookieWriteValid)) {
- ERROR("The cookie of Write is invalid. Cookie=" << cookie);
- Disconnected(EErrorCode::InternalError);
- return;
- }
-
- Write(cookie, std::move(it->second.Request));
-
- PendingReserve.erase(it);
- }
-
void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) {
auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
ev->Record = std::move(req);
@@ -651,24 +630,26 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return WriteResult(EErrorCode::InternalError, error, std::move(record));
}
- WriteAccepted(cookie);
-
- if (PendingReserve.empty()) {
- ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03");
+ auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
+ if (!cookieWriteValid) {
+ ERROR("The cookie of Write is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return;
}
+
+ WriteAccepted(cookie);
auto it = PendingReserve.begin();
auto& holder = it->second;
- if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) {
+ if ((holder.QuotaCheckEnabled && !holder.QuotaAccepted) || !ReceivedReserve.empty()) {
// There may be two situations:
// - a quota has been requested, and the quota has not been received yet
// - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed
EnqueueReservedAndProcess(cookie);
} else {
- Write(cookie);
+ Write(cookie, std::move(it->second.Request));
}
+ PendingReserve.erase(it);
} else {
if (PendingWrite.empty()) {
return WriteResult(EErrorCode::InternalError, "Unexpected Write response", std::move(record));
@@ -740,7 +721,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
PendingQuota.clear();
- ProcessQuota();
+ ProcessQuotaAndWrite();
break;
@@ -829,12 +810,12 @@ private:
struct RequestHolder {
NKikimrClient::TPersQueueRequest Request;
- bool QuotaChecked;
+ bool QuotaCheckEnabled;
bool QuotaAccepted;
- RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked)
+ RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaCheckEnabled)
: Request(std::move(request))
- , QuotaChecked(quotaChecked)
+ , QuotaCheckEnabled(quotaCheckEnabled)
, QuotaAccepted(false) {
}
};