diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-17 15:25:49 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-17 15:25:49 +0300 |
commit | 7b4fbde7aa7fa61fa5f38577482b5855bdfd1637 (patch) | |
tree | c27e188973e59e9deb695280b0b08009b9bacb53 | |
parent | 6e0e6037f0317cb933fdd1f45f1c4e86d3d73969 (diff) | |
download | ydb-7b4fbde7aa7fa61fa5f38577482b5855bdfd1637.tar.gz |
Move write logic to separated file
-rw-r--r-- | ydb/core/persqueue/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 1450 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 42 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_util.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 1422 |
9 files changed, 1470 insertions, 1453 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index ee7cf6c9143..c098f253b86 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index c5e43d10ce4..c7678bb40c1 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -54,6 +54,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index c5e43d10ce4..c7678bb40c1 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -54,6 +54,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index ee7cf6c9143..c098f253b86 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index bfa2bdbc840..7c894068c13 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -26,11 +26,8 @@ namespace NKikimr::NPQ { -static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; -static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; static const ui32 MAX_USER_ACTS = 1000; static const TDuration WAKE_TIMEOUT = TDuration::Seconds(5); -static const ui32 MAX_INLINE_SIZE = 1000; static const TDuration UPDATE_AVAIL_SIZE_INTERVAL = TDuration::MilliSeconds(100); static const TString WRITE_QUOTA_ROOT_PATH = "write-quota"; static const ui32 MAX_USERS = 1000; @@ -195,85 +192,11 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) { ctx.Send(Tablet, MakeReplyOk(dst).Release()); } -void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { - THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); - NKikimrClient::TResponse& resp = response->Response; - resp.SetStatus(NMsgBusProxy::MSTATUS_OK); - resp.SetErrorCode(NPersQueue::NErrorCode::OK); - resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie); - ctx.Send(Tablet, response.Release()); -} - -void TPartition::ReplyWrite( - const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, - const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, - const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) { - Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); - Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo); - - THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); - NKikimrClient::TResponse& resp = response->Response; - resp.SetStatus(NMsgBusProxy::MSTATUS_OK); - resp.SetErrorCode(NPersQueue::NErrorCode::OK); - auto write = resp.MutablePartitionResponse()->AddCmdWriteResult(); - write->SetSourceId(sourceId); - write->SetSeqNo(seqNo); - write->SetWriteTimestampMS(writeTimestamp.MilliSeconds()); - if (totalParts > 1) - write->SetPartNo(partNo); - write->SetAlreadyWritten(already); - if (already) - write->SetMaxSeqNo(maxSeqNo); - write->SetOffset(offset); - - write->SetPartitionQuotedTimeMs(partitionQuotedTime.MilliSeconds()); - write->SetTopicQuotedTimeMs(topicQuotedTime.MilliSeconds()); - write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds()); - write->SetWriteTimeMs(writeTime.MilliSeconds()); - - ctx.Send(Tablet, response.Release()); -} - - void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp) { ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp).Release()); } - -static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, - TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) { - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - auto read = request->Record.AddCmdReadRange(); - auto range = read->MutableRange(); - TKeyPrefix from(c, partition); - if (!key.empty()) { - Y_VERIFY(key.StartsWith(TStringBuf(from.Data(), from.Size()))); - from.Clear(); - from.Append(key.data(), key.size()); - } - range->SetFrom(from.Data(), from.Size()); - - TKeyPrefix to(c, partition + 1); - range->SetTo(to.Data(), to.Size()); - - if(includeData) - read->SetIncludeData(true); - - if (dropTmp) { - auto del = request->Record.AddCmdDeleteRange(); - auto range = del->MutableRange(); - TKeyPrefix from(TKeyPrefix::TypeTmpData, partition); - range->SetFrom(from.Data(), from.Size()); - - TKeyPrefix to(TKeyPrefix::TypeTmpData, partition + 1); - range->SetTo(to.Data(), to.Size()); - } - - ctx.Send(dst, request.Release()); -} - - NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) { return NKikimrClient::TKeyValueRequest::EStorageChannel(NKikimrClient::TKeyValueRequest::MAIN + i); } @@ -286,15 +209,6 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) { } -void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { - RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == ""); -} - -void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { - RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key); -} - - void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { TSet<TString> hasReadRule; @@ -676,16 +590,6 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { ScheduleUpdateAvailableSize(ctx); } -void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { - UpdateAvailableSize(ctx); - HandleWrites(ctx); -} - -void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { - UpdateAvailableSize(ctx); -} - - ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const { ui64 size = Size(); if (!DataKeysBody.empty()) { @@ -995,56 +899,6 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) Die(ctx); } -void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { - for (const auto& w : Requests) { - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full"); - if (w.IsWrite()) { - const auto& msg = w.GetWrite().Msg; - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); - WriteInflightSize -= msg.Data.size(); - } - } - - UpdateWriteBufferIsFullState(ctx.Now()); - Requests.clear(); - Y_VERIFY(Responses.empty()); - - WriteCycleSize = 0; - - ProcessReserveRequests(ctx); -} - -void TPartition::FailBadClient(const TActorContext& ctx) { - for (auto it = Owners.begin(); it != Owners.end();) { - it = DropOwner(it, ctx); - } - Y_VERIFY(Owners.empty()); - Y_VERIFY(ReservedSize == 0); - - for (const auto& w : Requests) { - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); - if (w.IsWrite()) { - const auto& msg = w.GetWrite().Msg; - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); - WriteInflightSize -= msg.Data.size(); - } - } - UpdateWriteBufferIsFullState(ctx.Now()); - Requests.clear(); - for (const auto& w : Responses) { - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); - if (w.IsWrite()) - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - } - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize); - Responses.clear(); - - ProcessChangeOwnerRequests(ctx); - ProcessReserveRequests(ctx); -} - bool CheckDiskStatus(const TStorageStatusFlags status) { return !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove); @@ -1135,43 +989,6 @@ void TPartition::UpdateUserInfoEndOffset(const TInstant& now) { } } -void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) { - - auto &owner = ev->Owner; - auto it = Owners.find(owner); - if (it == Owners.end()) { - Owners[owner]; - it = Owners.find(owner); - } - if (it->second.NeedResetOwner || ev->Force) { //change owner - Y_VERIFY(ReservedSize >= it->second.ReservedSize); - ReservedSize -= it->second.ReservedSize; - - it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie - //cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests' - EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx); - TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); - UpdateWriteBufferIsFullState(ctx.Now()); - ProcessReserveRequests(ctx); - } else { - it->second.WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release())); - } -} - - -THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { - Y_VERIFY(ReservedSize >= it->second.ReservedSize); - ReservedSize -= it->second.ReservedSize; - UpdateWriteBufferIsFullState(ctx.Now()); - TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); - for (auto& ev : it->second.WaitToChangeOwner) { //this request maybe could be done right now - WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release())); - } - auto jt = it; - ++jt; - Owners.erase(it); - return jt; -} void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet<TString> important; @@ -1208,14 +1025,6 @@ void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorC } -void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) { - bool res = OwnerPipes.insert(ev->Get()->PipeClient).second; - Y_VERIFY(res); - WaitToChangeOwner.push_back(ev->Release()); - ProcessChangeOwnerRequests(ctx); -} - - void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx) { const TString& owner = ev->Get()->Owner; @@ -1235,78 +1044,6 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex } } - -void TPartition::ProcessReserveRequests(const TActorContext& ctx) { - const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize(); - - while (!ReserveRequests.empty()) { - const TString& ownerCookie = ReserveRequests.front()->OwnerCookie; - const TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); - const ui64& size = ReserveRequests.front()->Size; - const ui64& cookie = ReserveRequests.front()->Cookie; - const bool& lastRequest = ReserveRequests.front()->LastRequest; - - auto it = Owners.find(owner); - if (it == Owners.end() || it->second.OwnerCookie != ownerCookie) { - ReplyError(ctx, cookie, NPersQueue::NErrorCode::BAD_REQUEST, "ReserveRequest from dead ownership session"); - ReserveRequests.pop_front(); - continue; - } - - const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize; - if (currentSize != 0 && currentSize + size > maxWriteInflightSize) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched"); - break; - } - - if (WaitingForSubDomainQuota(ctx, currentSize)) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace"); - break; - } - - it->second.AddReserveRequest(size, lastRequest); - ReservedSize += size; - - ReplyOk(ctx, cookie); - - ReserveRequests.pop_front(); - } - UpdateWriteBufferIsFullState(ctx.Now()); - TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); -} - -void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) { - WriteBufferIsFullCounter.UpdateWorkingTime(now); - WriteBufferIsFullCounter.UpdateState(ReservedSize + WriteInflightSize + WriteCycleSize >= Config.GetPartitionConfig().GetBorderWriteInflightSize()); -} - - - -void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) { - const TString& ownerCookie = ev->Get()->OwnerCookie; - TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); - const ui64& messageNo = ev->Get()->MessageNo; - - auto it = Owners.find(owner); - if (it == Owners.end() || it->second.OwnerCookie != ownerCookie) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, "ReserveRequest from dead ownership session"); - return; - } - - if (messageNo != it->second.NextMessageNo) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "reorder in reserve requests, waiting " << it->second.NextMessageNo << ", but got " << messageNo); - DropOwner(it, ctx); - ProcessChangeOwnerRequests(ctx); - return; - } - - ++it->second.NextMessageNo; - ReserveRequests.push_back(ev->Release()); - ProcessReserveRequests(ctx); -} - - void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TOffsetsResponse::TPartResult result; result.SetPartition(Partition); @@ -1970,12 +1707,6 @@ TReadAnswer TReadInfo::FormAnswer( } -void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { - HandleOnWrite(ev, ctx); - HandleWrites(ctx); -} - - void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ctx) { auto res = Subscriber.OnTimeout(ev); if (!res) @@ -2231,125 +1962,6 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { } } -void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { - ui64 offset = EndOffset; - while (!Responses.empty()) { - const auto& response = Responses.front(); - - const TDuration quotedTime = response.QuotedTime; - const TDuration queueTime = response.QueueTime; - const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline; - - if (response.IsWrite()) { - const auto& writeResponse = response.GetWrite(); - const TString& s = writeResponse.Msg.SourceId; - const ui64& seqNo = writeResponse.Msg.SeqNo; - const ui16& partNo = writeResponse.Msg.PartNo; - const ui16& totalParts = writeResponse.Msg.TotalParts; - const TMaybe<ui64>& wrOffset = writeResponse.Offset; - - bool already = false; - - auto it = SourceIdStorage.GetInMemorySourceIds().find(s); - - ui64 maxSeqNo = 0; - ui64 maxOffset = 0; - - if (it != SourceIdStorage.GetInMemorySourceIds().end()) { - maxSeqNo = it->second.SeqNo; - maxOffset = it->second.Offset; - if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { - already = true; - } - } - - if (!already) { - if (wrOffset) { - Y_VERIFY(*wrOffset >= offset); - offset = *wrOffset; - } - } - if (!already && partNo + 1 == totalParts) { - if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); - SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp); - } else { - SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp)); - } - - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); - } - ReplyWrite( - ctx, writeResponse.Cookie, s, seqNo, partNo, totalParts, - already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo, - quotedTime, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime - ); - LOG_DEBUG_S( - ctx, - NKikimrServices::PERSQUEUE, - "Answering for message sourceid: '" << EscapeC(s) << - "', Topic: '" << TopicConverter->GetClientsideName() << - "', Partition: " << Partition << - ", SeqNo: " << seqNo << ", partNo: " << partNo << - ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") - ); - if (PartitionWriteQuotaWaitCounter) { - PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds()); - } - - if (!already && partNo + 1 == totalParts) - ++offset; - } else if (response.IsOwnership()) { - const TString& ownerCookie = response.GetOwnership().OwnerCookie; - auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie)); - if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) { - ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie); - } else { - ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); - } - } else if (response.IsRegisterMessageGroup()) { - const auto& body = response.GetRegisterMessageGroup().Body; - - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } - - Y_VERIFY(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); - ReplyOk(ctx, response.GetCookie()); - } else if (response.IsDeregisterMessageGroup()) { - const auto& body = response.GetDeregisterMessageGroup().Body; - - SourceIdStorage.DeregisterSourceId(body.SourceId); - ReplyOk(ctx, response.GetCookie()); - } else if (response.IsSplitMessageGroup()) { - const auto& split = response.GetSplitMessageGroup(); - - for (const auto& body : split.Deregistrations) { - SourceIdStorage.DeregisterSourceId(body.SourceId); - } - - for (const auto& body : split.Registrations) { - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } - - Y_VERIFY(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); - } - - ReplyOk(ctx, response.GetCookie()); - } else { - Y_FAIL("Unexpected message"); - } - Responses.pop_front(); - } - TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero(); -} - - void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) { if (userInfo.ReadScheduled) return; @@ -2554,73 +2166,6 @@ void TPartition::CheckHeadConsistency() const { } -void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { - if (!CompactedKeys.empty()) - HeadKeys.clear(); - - if (NewHeadKey.Size > 0) { - while (!HeadKeys.empty() && - (HeadKeys.back().Key.GetOffset() > NewHeadKey.Key.GetOffset() || HeadKeys.back().Key.GetOffset() == NewHeadKey.Key.GetOffset() - && HeadKeys.back().Key.GetPartNo() >= NewHeadKey.Key.GetPartNo())) { - HeadKeys.pop_back(); - } - HeadKeys.push_back(NewHeadKey); - NewHeadKey = TDataKey{TKey{}, 0, TInstant::Zero(), 0}; - } - - if (CompactedKeys.empty() && NewHead.PackedSize == 0) { //Nothing writed at all - return; - } - - Y_VERIFY(EndOffset == Head.GetNextOffset()); - - if (!CompactedKeys.empty() || Head.PackedSize == 0) { //has compactedkeys or head is already empty - Head.PackedSize = 0; - Head.Offset = NewHead.Offset; - Head.PartNo = NewHead.PartNo; //no partNo at this point - Head.Batches.clear(); - } - - while (!CompactedKeys.empty()) { - const auto& ck = CompactedKeys.front(); - BodySize += ck.second; - Y_VERIFY(!ck.first.IsHead()); - ui64 lastOffset = DataKeysBody.empty() ? 0 : (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()); - Y_VERIFY(lastOffset <= ck.first.GetOffset()); - if (DataKeysBody.empty()) { - StartOffset = ck.first.GetOffset() + (ck.first.GetPartNo() > 0 ? 1 : 0); - } else { - if (lastOffset < ck.first.GetOffset()) { - GapOffsets.push_back(std::make_pair(lastOffset, ck.first.GetOffset())); - GapSize += ck.first.GetOffset() - lastOffset; - } - } - DataKeysBody.push_back({ck.first, ck.second, ctx.Now(), DataKeysBody.empty() ? 0 : DataKeysBody.back().CumulativeSize + DataKeysBody.back().Size}); - - CompactedKeys.pop_front(); - } // head cleared, all data moved to body - - //append Head with newHead - while (!NewHead.Batches.empty()) { - Head.Batches.push_back(NewHead.Batches.front()); - NewHead.Batches.pop_front(); - } - Head.PackedSize += NewHead.PackedSize; - - if (Head.PackedSize > 0 && DataKeysBody.empty()) { - StartOffset = Head.Offset + (Head.PartNo > 0 ? 1 : 0); - } - - EndOffset = Head.GetNextOffset(); - NewHead.Clear(); - NewHead.Offset = EndOffset; - - CheckHeadConsistency(); - - UpdateUserInfoEndOffset(ctx.Now()); -} - - ui64 TPartition::GetSizeLag(i64 offset) { ui64 sizeLag = 0; if (!DataKeysBody.empty() && (offset < (i64)Head.Offset || offset == (i64)Head.Offset && Head.PartNo > 0)) { //there will be something in body @@ -2932,10 +2477,6 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& } } -void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) { - HandleWriteResponse(ctx); -} - void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) { Y_VERIFY(cookie == SET_OFFSET_COOKIE); @@ -3946,325 +3487,6 @@ void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) { ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize()); } -void TPartition::HandleWriteResponse(const TActorContext& ctx) { - - Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite); - ui64 prevEndOffset = EndOffset; - - ui32 totalLatencyMs = (ctx.Now() - WriteCycleStartTime).MilliSeconds(); - ui32 writeLatencyMs = (ctx.Now() - WriteStartTime).MilliSeconds(); - - WriteLatency.IncFor(writeLatencyMs, 1); - if (writeLatencyMs >= AppData(ctx)->PQConfig.GetWriteLatencyBigMs()) { - SLIBigLatency.Inc(); - } - - TabletCounters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize); - TabletCounters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize); - TabletCounters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize); - if (BytesWrittenGrpc) - BytesWrittenGrpc.Inc(WriteNewSizeInternal); - if (BytesWrittenTotal) - BytesWrittenTotal.Inc(WriteNewSize); - - if (BytesWrittenUncompressed) - BytesWrittenUncompressed.Inc(WriteNewSizeUncompressed); - if (BytesWrittenComp) - BytesWrittenComp.Inc(WriteCycleSize); - if (MsgsWrittenGrpc) - MsgsWrittenGrpc.Inc(WriteNewMessagesInternal); - if (MsgsWrittenTotal) - MsgsWrittenTotal.Inc(WriteNewMessages); - - //All ok - auto now = ctx.Now(); - const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig(); - if (quotingConfig.GetTopicWriteQuotaEntityToLimit() == NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) { - WriteQuota->Exaust(WriteNewSize, now); - } else { - WriteQuota->Exaust(WriteCycleSize, now); - } - for (auto& avg : AvgWriteBytes) { - avg.Update(WriteNewSize, now); - } - for (auto& avg : AvgQuotaBytes) { - avg.Update(WriteNewSize, now); - } - - WriteCycleSize = 0; - WriteNewSize = 0; - WriteNewSizeInternal = 0; - WriteNewSizeUncompressed = 0; - WriteNewMessages = 0; - WriteNewMessagesInternal = 0; - UpdateWriteBufferIsFullState(now); - - AnswerCurrentWrites(ctx); - SyncMemoryStateWithKVState(ctx); - - //if EndOffset changed there could be subscriptions witch could be completed - TVector<std::pair<TReadInfo, ui64>> reads = Subscriber.GetReads(EndOffset); - for (auto& read : reads) { - Y_VERIFY(EndOffset > read.first.Offset); - ProcessRead(ctx, std::move(read.first), read.second, true); - } - //same for read requests - ProcessHasDataRequests(ctx); - - ProcessTimestampsForNewData(prevEndOffset, ctx); - - HandleWrites(ctx); -} - -void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { - ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){ - return sum + msg.Data.size(); - }); - - bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom(); - - if (mirroredPartition && !ev->Get()->OwnerCookie.empty()) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "Write to mirrored topic is forbiden "); - return; - } - - ui64 decReservedSize = 0; - TStringBuf owner; - - if (!mirroredPartition && !ev->Get()->IsDirectWrite) { - owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie); - auto it = Owners.find(owner); - - if (it == Owners.end() || it->second.NeedResetOwner) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, - TStringBuilder() << "new GetOwnership request needed for owner " << owner); - return; - } - - if (it->second.SourceIdDeleted) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, - TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. " - << "Current count of sourceIds is " << SourceIdStorage.GetInMemorySourceIds().size() << " and limit is " << Config.GetPartitionConfig().GetSourceIdMaxCounts() - << ", current minimum sourceid timestamp(Ms) is " << SourceIdStorage.MinAvailableTimestamp(ctx.Now()).MilliSeconds() - << " and border timestamp(Ms) is " << ((ctx.Now() - TInstant::Seconds(Config.GetPartitionConfig().GetSourceIdLifetimeSeconds())).MilliSeconds())); - return; - } - - if (it->second.OwnerCookie != ev->Get()->OwnerCookie) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, - TStringBuilder() << "incorrect ownerCookie " << ev->Get()->OwnerCookie << ", must be " << it->second.OwnerCookie); - return; - } - - if (ev->Get()->MessageNo != it->second.NextMessageNo) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "reorder in requests, waiting " << it->second.NextMessageNo << ", but got " << ev->Get()->MessageNo); - DropOwner(it, ctx); - return; - } - - ++it->second.NextMessageNo; - decReservedSize = it->second.DecReservedSize(); - } - - ReservedSize -= decReservedSize; - TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); - - TMaybe<ui64> offset = ev->Get()->Offset; - - if (WriteInflightSize > Config.GetPartitionConfig().GetMaxWriteInflightSize()) { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); - - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD, - TStringBuilder() << "try later. Write inflight limit reached. " - << WriteInflightSize << " vs. maximum " << Config.GetPartitionConfig().GetMaxWriteInflightSize()); - return; - } - for (const auto& msg: ev->Get()->Msgs) { - //this is checked in pq_impl when forming EvWrite request - Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite); - Y_VERIFY(!msg.Data.empty()); - - if (msg.SeqNo > (ui64)Max<i64>()) { - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Request to write wrong SeqNo. Partition " - << Partition << " sourceId '" << EscapeC(msg.SourceId) << "' seqno " << msg.SeqNo); - - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "wrong SeqNo " << msg.SeqNo); - return; - } - - ui32 sz = msg.Data.size() + msg.SourceId.size() + TClientBlob::OVERHEAD; - - if (sz > MAX_BLOB_PART_SIZE) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "too big message " << sz << " vs. maximum " << MAX_BLOB_PART_SIZE); - return; - } - - if (!mirroredPartition) { - SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner); - } - } - - const ui64 maxSize = Config.GetPartitionConfig().GetMaxSizeInPartition(); - const ui64 maxCount = Config.GetPartitionConfig().GetMaxCountInPartition(); - if (EndOffset - StartOffset >= maxCount || Size() >= maxSize) { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); - - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL, - Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, Size(), maxSize)); - return; - } - ui64 size = 0; - for (auto& msg: ev->Get()->Msgs) { - size += msg.Data.size(); - bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts; - EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx); - if (offset && needToChangeOffset) - ++*offset; - } - WriteInflightSize += size; - - // TODO: remove decReservedSize == 0 - Y_VERIFY(size <= decReservedSize || decReservedSize == 0); - UpdateWriteBufferIsFullState(ctx.Now()); -} - -void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - HandleOnWrite(ev, ctx); - HandleWrites(ctx); -} - -void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - const auto& body = ev->Get()->Body; - - auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); - if (it != SourceIdStorage.GetInMemorySourceIds().end()) { - if (!it->second.Explicit) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - "Trying to register implicitly registered SourceId"); - } - - switch (it->second.State) { - case TSourceIdInfo::EState::Registered: - return ReplyOk(ctx, ev->Get()->Cookie); - case TSourceIdInfo::EState::PendingRegistration: - if (!body.AfterSplit) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - "AfterSplit must be set"); - } - break; - default: - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::ERROR, - TStringBuilder() << "Unknown state: " << static_cast<ui32>(it->second.State)); - } - } else if (body.AfterSplit) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - "SourceId not found, registration cannot be completed"); - } - - EmplaceRequest(TRegisterMessageGroupMsg(*ev->Get()), ctx); -} - -void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - HandleOnWrite(ev, ctx); - HandleWrites(ctx); -} - -void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - const auto& body = ev->Get()->Body; - - auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); - if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, - "SourceId doesn't exist"); - } - - EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx); -} - -void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { - HandleOnWrite(ev, ctx); - HandleWrites(ctx); -} - -void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { - if (ev->Get()->Deregistrations.size() > 1) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - TStringBuilder() << "Currently, single deregistrations are supported"); - } - - TSplitMessageGroupMsg msg(ev->Get()->Cookie); - - for (auto& body : ev->Get()->Deregistrations) { - auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); - if (it != SourceIdStorage.GetInMemorySourceIds().end()) { - msg.Deregistrations.push_back(std::move(body)); - } else { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, - "SourceId doesn't exist"); - } - } - - for (auto& body : ev->Get()->Registrations) { - auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); - if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - msg.Registrations.push_back(std::move(body)); - } else { - if (!it->second.Explicit) { - return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, - "Trying to register implicitly registered SourceId"); - } - } - } - - EmplaceRequest(std::move(msg), ctx); -} - -std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) { - std::pair<TKey, ui32> res({key, size}); - ui32 x = headCleared ? 0 : Head.PackedSize; - Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u, [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == NewHead.PackedSize + x); - for (auto it = DataKeysHead.rbegin(); it != DataKeysHead.rend(); ++it) { - auto jt = it; ++jt; - if (it->NeedCompaction()) { - res = it->Compact(); - if (jt != DataKeysHead.rend()) { - jt->AddKey(res.first, res.second); - } - } else { - Y_VERIFY(jt == DataKeysHead.rend() || !jt->NeedCompaction()); //compact must start from last level, not internal - } - Y_VERIFY(!it->NeedCompaction()); - } - Y_VERIFY(res.second >= size); - Y_VERIFY(res.first.GetOffset() < key.GetOffset() || res.first.GetOffset() == key.GetOffset() && res.first.GetPartNo() <= key.GetPartNo()); - return res; -} - - -void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { - while (!WaitToChangeOwner.empty()) { - auto &ev = WaitToChangeOwner.front(); - if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe - ProcessChangeOwnerRequest(ev.Release(), ctx); - } else { - ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::ERROR, "Pipe for GetOwnershipRequest is already dead"); - } - WaitToChangeOwner.pop_front(); - } - if (CurrentStateFunc() == &TThis::StateIdle) { - HandleWrites(ctx); - } -} - - void TPartition::BecomeIdle(const TActorContext&) { Become(&TThis::StateIdle); } @@ -4284,480 +3506,6 @@ void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue: } } - -void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST) { - ReplyError(ctx, p.Cookie, errorCode, errorStr); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); - FailBadClient(ctx); - NewHead.Clear(); - NewHead.Offset = EndOffset; - sourceIdWriter.Clear(); - request->Record.Clear(); - PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); - CompactedKeys.clear(); - - WriteCycleSize = 0; -} - - -bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, - TSourceIdWriter& sourceIdWriter) { - - ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; - - WriteCycleSize = 0; - WriteNewSize = 0; - WriteNewSizeUncompressed = 0; - WriteNewMessages = 0; - UpdateWriteBufferIsFullState(ctx.Now()); - CurrentTimestamp = ctx.Now(); - - NewHead.Offset = EndOffset; - NewHead.PartNo = 0; - NewHead.PackedSize = 0; - - Y_VERIFY(NewHead.Batches.empty()); - - bool oldPartsCleared = false; - bool headCleared = (Head.PackedSize == 0); - - - //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too) - //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs - //otherwise write this batch as is to head; - - while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big - auto pp = Requests.front(); - Requests.pop_front(); - - if (!pp.IsWrite()) { - if (pp.IsRegisterMessageGroup()) { - auto& body = pp.GetRegisterMessageGroup().Body; - - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } - - body.AssignedOffset = curOffset; - sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange)); - } else if (pp.IsDeregisterMessageGroup()) { - sourceIdWriter.DeregisterSourceId(pp.GetDeregisterMessageGroup().Body.SourceId); - } else if (pp.IsSplitMessageGroup()) { - for (auto& body : pp.GetSplitMessageGroup().Deregistrations) { - sourceIdWriter.DeregisterSourceId(body.SourceId); - } - - for (auto& body : pp.GetSplitMessageGroup().Registrations) { - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } - - body.AssignedOffset = curOffset; - sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange), true); - } - } else { - Y_VERIFY(pp.IsOwnership()); - } - - EmplaceResponse(std::move(pp), ctx); - continue; - } - - Y_VERIFY(pp.IsWrite()); - auto& p = pp.GetWrite(); - - WriteInflightSize -= p.Msg.Data.size(); - - TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp); - //check already written - - ui64 poffset = p.Offset ? *p.Offset : curOffset; - - auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId); - auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId); - if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) { - bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end()); - bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end()); - - if (poffset >= curOffset) { - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Already written message. Topic: '" << TopicConverter->GetClientsideName() - << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) - << "'. Message seqNo = " << p.Msg.SeqNo - << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) - << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) - << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset - ); - - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); - } else { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); - } - - TString().swap(p.Msg.Data); - EmplaceResponse(std::move(pp), ctx); - continue; - } - - if (poffset < curOffset) { //too small offset - CancelAllWritesOnWrite(ctx, request, - TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo - << " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset, - p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); - return false; - } - - Y_VERIFY(poffset >= curOffset); - - bool needCompactHead = poffset > curOffset; - if (needCompactHead) { //got gap - if (p.Msg.PartNo != 0) { //gap can't be inside of partitioned message - CancelAllWritesOnWrite(ctx, request, - TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo - << " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset " - << poffset << ", must be " << curOffset, - p, sourceIdWriter); - return false; - } - curOffset = poffset; - } - - if (p.Msg.PartNo == 0) { //create new PartitionedBlob - //there could be parts from previous owner, clear them - if (!oldPartsCleared) { - oldPartsCleared = true; - auto del = request->Record.AddCmdDeleteRange(); - auto range = del->MutableRange(); - TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition); - range->SetFrom(from.Data(), from.Size()); - TKeyPrefix to(TKeyPrefix::TypeTmpData, Partition + 1); - range->SetTo(to.Data(), to.Size()); - } - - if (PartitionedBlob.HasFormedBlobs()) { - //clear currently-writed blobs - auto oldCmdWrite = request->Record.GetCmdWrite(); - request->Record.ClearCmdWrite(); - for (ui32 i = 0; i < (ui32)oldCmdWrite.size(); ++i) { - TKey key(oldCmdWrite.Get(i).GetKey()); - if (key.GetType() != TKeyPrefix::TypeTmpData) { - request->Record.AddCmdWrite()->CopyFrom(oldCmdWrite.Get(i)); - } - } - } - PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo, - p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead, - headCleared, needCompactHead, MaxBlobSize); - } - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) << - "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo - ); - TString s; - if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) { - //this must not be happen - client sends gaps, fail this client till the end - CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter); - //now no changes will leak - return false; - } - - WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size(); - WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size()); - WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size(); - if (p.Msg.PartNo == 0) { - ++WriteNewMessages; - if (!p.Msg.External) - ++WriteNewMessagesInternal; - } - - TMaybe<TPartData> partData; - if (p.Msg.TotalParts > 1) { //this is multi-part message - partData = TPartData(p.Msg.PartNo, p.Msg.TotalParts, p.Msg.TotalSize); - } - WriteTimestamp = ctx.Now(); - WriteTimestampEstimate = p.Msg.WriteTimestamp > 0 ? TInstant::MilliSeconds(p.Msg.WriteTimestamp) : WriteTimestamp; - TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.Data, std::move(partData), WriteTimestampEstimate, - TInstant::MilliSeconds(p.Msg.CreateTimestamp == 0 ? curOffset : p.Msg.CreateTimestamp), - p.Msg.UncompressedSize, p.Msg.PartitionKey, p.Msg.ExplicitHashKey); //remove curOffset when LB will report CTime - - const ui64 writeLagMs = - (WriteTimestamp - TInstant::MilliSeconds(p.Msg.CreateTimestamp)).MilliSeconds(); - WriteLagMs.Update(writeLagMs, WriteTimestamp); - if (InputTimeLag) { - InputTimeLag->IncFor(writeLagMs, 1); - if (p.Msg.PartNo == 0) { - MessageSize->IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1); - } - } - - bool lastBlobPart = blob.IsLastPart(); - - //will return compacted tmp blob - std::pair<TKey, TString> newWrite = PartitionedBlob.Add(std::move(blob)); - - if (!newWrite.second.empty()) { - auto write = request->Record.AddCmdWrite(); - write->SetKey(newWrite.first.Data(), newWrite.first.Size()); - write->SetValue(newWrite.second); - Y_VERIFY(!newWrite.first.IsHead()); - auto channel = GetChannel(NextChannel(newWrite.first.IsHead(), newWrite.second.Size())); - write->SetStorageChannel(channel); - write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); - - TKey resKey = newWrite.first; - resKey.SetType(TKeyPrefix::TypeData); - write->SetKeyToCache(resKey.Data(), resKey.Size()); - WriteCycleSize += newWrite.second.size(); - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << - "' partition " << Partition << - " part blob sourceId '" << EscapeC(p.Msg.SourceId) << - "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << - " result is " << TStringBuf(newWrite.first.Data(), newWrite.first.Size()) << - " size " << newWrite.second.size() - ); - } - - if (lastBlobPart) { - Y_VERIFY(PartitionedBlob.IsComplete()); - ui32 curWrites = 0; - for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs - TKey key(request->Record.GetCmdWrite(i).GetKey()); - if (key.GetType() == TKeyPrefix::TypeTmpData) { - key.SetType(TKeyPrefix::TypeData); - request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size())); - ++curWrites; - } - } - Y_VERIFY(curWrites <= PartitionedBlob.GetFormedBlobs().size()); - auto formedBlobs = PartitionedBlob.GetFormedBlobs(); - for (ui32 i = 0; i < formedBlobs.size(); ++i) { - const auto& x = formedBlobs[i]; - if (i + curWrites < formedBlobs.size()) { //this KV pair is already writed, rename needed - auto rename = request->Record.AddCmdRename(); - TKey key = x.first; - rename->SetOldKey(TString(key.Data(), key.Size())); - key.SetType(TKeyPrefix::TypeData); - rename->SetNewKey(TString(key.Data(), key.Size())); - } - if (!DataKeysBody.empty() && CompactedKeys.empty()) { - Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.first.GetOffset(), - "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, x.first.ToString().c_str()); - } - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " " << x.first.ToString() << " size " << x.second << " WTime " << ctx.Now().MilliSeconds() - ); - - CompactedKeys.push_back(x); - CompactedKeys.back().first.SetType(TKeyPrefix::TypeData); - } - if (PartitionedBlob.HasFormedBlobs()) { //Head and newHead are cleared - headCleared = true; - NewHead.Clear(); - NewHead.Offset = PartitionedBlob.GetOffset(); - NewHead.PartNo = PartitionedBlob.GetHeadPartNo(); - NewHead.PackedSize = 0; - } - ui32 countOfLastParts = 0; - for (auto& x : PartitionedBlob.GetClientBlobs()) { - if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) { - NewHead.Batches.emplace_back(curOffset, x.GetPartNo(), TVector<TClientBlob>()); - NewHead.PackedSize += GetMaxHeaderSize(); //upper bound for packed size - } - if (x.IsLastPart()) { - ++countOfLastParts; - } - Y_VERIFY(!NewHead.Batches.back().Packed); - NewHead.Batches.back().AddBlob(x); - NewHead.PackedSize += x.GetBlobSize(); - if (NewHead.Batches.back().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) { - NewHead.Batches.back().Pack(); - NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob - - NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound - NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize(); - } - } - - Y_VERIFY(countOfLastParts == 1); - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " part blob complete sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo - << " partNo " << p.Msg.PartNo << " FormedBlobsCount " << PartitionedBlob.GetFormedBlobs().size() - << " NewHead: " << NewHead - ); - - if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) { - sourceIdWriter.RegisterSourceId(p.Msg.SourceId, p.Msg.SeqNo, curOffset, CurrentTimestamp); - } else { - sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(p.Msg.SeqNo, curOffset, CurrentTimestamp)); - } - - ++curOffset; - PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); - } - TString().swap(p.Msg.Data); - EmplaceResponse(std::move(pp), ctx); - } - - UpdateWriteBufferIsFullState(ctx.Now()); - - if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) { - NewHead.Batches.back().Pack(); - NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob - - NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound - NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize(); - } - - Y_VERIFY((headCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl - MaxWriteResponsesSize = Max<ui32>(MaxWriteResponsesSize, Responses.size()); - - return headCleared; -} - - -std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) { - bool needCompaction = false; - ui32 HeadSize = headCleared ? 0 : Head.PackedSize; - if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize - >= Min<ui32>(MaxBlobSize, Config.GetPartitionConfig().GetLowWatermark())) - needCompaction = true; - - if (PartitionedBlob.IsInited()) { //has active partitioned blob - compaction is forbiden, head and newHead will be compacted when this partitioned blob is finished - needCompaction = false; - } - - Y_VERIFY(NewHead.PackedSize > 0 || needCompaction); //smthing must be here - - TKey key(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount(), !needCompaction); - - if (NewHead.PackedSize > 0) - DataKeysHead[TotalLevels - 1].AddKey(key, NewHead.PackedSize); - Y_VERIFY(HeadSize + NewHead.PackedSize <= 3 * MaxSizeCheck); - - std::pair<TKey, ui32> res; - - if (needCompaction) { //compact all - for (ui32 i = 0; i < TotalLevels; ++i) { - DataKeysHead[i].Clear(); - } - if (!headCleared) { //compacted blob must contain both head and NewHead - key = TKey(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(), - Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount(), false); - } //otherwise KV blob is not from head (!key.IsHead()) and contains only new data from NewHead - res = std::make_pair(key, HeadSize + NewHead.PackedSize); - } else { - res = Compact(key, NewHead.PackedSize, headCleared); - Y_VERIFY(res.first.IsHead());//may compact some KV blobs from head, but new KV blob is from head too - Y_VERIFY(res.second >= NewHead.PackedSize); //at least new data must be writed - } - Y_VERIFY(res.second <= MaxBlobSize); - return res; -} - -void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) { - const auto& key = res.first; - - TString valueD; - valueD.reserve(res.second); - ui32 pp = Head.FindPos(key.GetOffset(), key.GetPartNo()); - if (pp < Max<ui32>() && key.GetOffset() < EndOffset) { //this batch trully contains this offset - Y_VERIFY(pp < Head.Batches.size()); - Y_VERIFY(Head.Batches[pp].GetOffset() == key.GetOffset()); - Y_VERIFY(Head.Batches[pp].GetPartNo() == key.GetPartNo()); - for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here - Y_VERIFY(Head.Batches[pp].Packed); - valueD += Head.Batches[pp].Serialize(); - } - } - for (auto& b : NewHead.Batches) { - Y_VERIFY(b.Packed); - valueD += b.Serialize(); - } - - Y_VERIFY(res.second >= valueD.size()); - - if (res.second > valueD.size() && res.first.IsHead()) { //change to real size if real packed size is smaller - - Y_FAIL("Can't be here right now, only after merging of small batches"); - - for (auto it = DataKeysHead.rbegin(); it != DataKeysHead.rend(); ++it) { - if (it->KeysCount() > 0 ) { - auto res2 = it->PopBack(); - Y_VERIFY(res2 == res); - res2.second = valueD.size(); - - DataKeysHead[TotalLevels - 1].AddKey(res2.first, res2.second); - - res2 = Compact(res2.first, res2.second, headCleared); - - Y_VERIFY(res2.first == res.first); - Y_VERIFY(res2.second == valueD.size()); - res = res2; - break; - } - } - } - - Y_VERIFY(res.second == valueD.size() || res.first.IsHead()); - - CheckBlob(key, valueD); - - auto write = request->Record.AddCmdWrite(); - write->SetKey(key.Data(), key.Size()); - write->SetValue(valueD); - - if (!key.IsHead()) - write->SetKeyToCache(key.Data(), key.Size()); - - bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE; - - if (isInline) - write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); - else { - auto channel = GetChannel(NextChannel(key.IsHead(), valueD.size())); - write->SetStorageChannel(channel); - write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); - } - - //Need to clear all compacted blobs - TKey k = CompactedKeys.empty() ? key : CompactedKeys.front().first; - ClearOldHead(k.GetOffset(), k.GetPartNo(), request); - - if (!key.IsHead()) { - if (!DataKeysBody.empty() && CompactedKeys.empty()) { - Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= key.GetOffset(), - "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, key.ToString().c_str()); - } - CompactedKeys.push_back(res); - NewHead.Clear(); - NewHead.Offset = res.first.GetOffset() + res.first.GetCount(); - NewHead.PartNo = 0; - } else { - Y_VERIFY(NewHeadKey.Size == 0); - NewHeadKey = {key, res.second, CurrentTimestamp, 0}; - } - WriteCycleSize += write->GetValue().size(); - UpdateWriteBufferIsFullState(ctx.Now()); -} - - ui32 TPartition::NextChannel(bool isHead, ui32 blobSize) { if (isHead) { @@ -4777,153 +3525,6 @@ ui32 TPartition::NextChannel(bool isHead, ui32 blobSize) { return res; } -void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { - - QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); - - ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck()); - } -} - -void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) { - FilterDeadlinedWrites(ctx); -} - -bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) { - - FilterDeadlinedWrites(ctx); - - if (!WriteQuota->CanExaust(now)) { // Waiting for partition quota. - SetDeadlinesForWrites(ctx); - return false; - } - - if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) { // Waiting for topic quota. - SetDeadlinesForWrites(ctx); - - if (StartTopicQuotaWaitTimeForCurrentBlob == TInstant::Zero() && !Requests.empty()) { - StartTopicQuotaWaitTimeForCurrentBlob = now; - } - return false; - } - - QuotaDeadline = TInstant::Zero(); - - if (Requests.empty()) - return false; - - Y_VERIFY(request->Record.CmdWriteSize() == 0); - Y_VERIFY(request->Record.CmdRenameSize() == 0); - Y_VERIFY(request->Record.CmdDeleteRangeSize() == 0); - const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() - ? ESourceIdFormat::Proto - : ESourceIdFormat::Raw; - TSourceIdWriter sourceIdWriter(format); - - bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter); - - if (headCleared) { - Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0); - for (ui32 i = 0; i < TotalLevels; ++i) { - DataKeysHead[i].Clear(); - } - } - - if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed - if (sourceIdWriter.GetSourceIdsToWrite().empty()) { - return request->Record.CmdWriteSize() > 0 - || request->Record.CmdRenameSize() > 0 - || request->Record.CmdDeleteRangeSize() > 0; - } else { - sourceIdWriter.FillRequest(request, Partition); - return true; - } - } - - sourceIdWriter.FillRequest(request, Partition); - - std::pair<TKey, ui32> res = GetNewWriteKey(headCleared); - const auto& key = res.first; - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " compactOffset " << key.GetOffset() << "," << key.GetCount() - << " HeadOffset " << Head.Offset << " endOffset " << EndOffset << " curOffset " - << NewHead.GetNextOffset() << " " << key.ToString() - << " size " << res.second << " WTime " << ctx.Now().MilliSeconds() - ); - - AddNewWriteBlob(res, request, headCleared, ctx); - return true; -} - -void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { - if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) - return; - - std::deque<TMessage> newRequests; - for (auto& w : Requests) { - if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) { - newRequests.emplace_back(std::move(w)); - continue; - } - if (w.IsWrite()) { - const auto& msg = w.GetWrite().Msg; - - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); - WriteInflightSize -= msg.Data.size(); - } - - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); - } - Requests = std::move(newRequests); - QuotaDeadline = TInstant::Zero(); - - UpdateWriteBufferIsFullState(ctx.Now()); -} - - -void TPartition::HandleWrites(const TActorContext& ctx) { - Become(&TThis::StateWrite); - - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - - Y_VERIFY(Head.PackedSize + NewHead.PackedSize <= 2 * MaxSizeCheck); - - TInstant now = ctx.Now(); - WriteCycleStartTime = now; - - bool haveData = false; - bool haveCheckDisk = false; - - if (!Requests.empty() && DiskIsFull) { - CancelAllWritesOnIdle(ctx); - AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); - haveCheckDisk = true; - } else { - haveData = ProcessWrites(request.Get(), now, ctx); - } - bool haveDrop = CleanUp(request.Get(), haveData, ctx); - - ProcessReserveRequests(ctx); - if (!haveData && !haveDrop && !haveCheckDisk) { //no data writed/deleted - if (!Requests.empty()) { //there could be change ownership requests that - bool res = ProcessWrites(request.Get(), now, ctx); - Y_VERIFY(!res); - } - Y_VERIFY(Requests.empty() || !WriteQuota->CanExaust(now) || WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left - AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all - BecomeIdle(ctx); - return; - } - - WritesTotal.Inc(); - WriteBlobWithQuota(std::move(request)); -} - void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) { ui32 count = 0; @@ -5039,57 +3640,6 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { } } -void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) { - LOG_DEBUG_S( - TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, - "Send write quota request." << - " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << - " Partition: " << Partition << "." << - " Amount: " << dataSize << "." << - " Cookie: " << cookie - ); - - Send(MakeQuoterServiceID(), - new TEvQuota::TEvRequest( - TEvQuota::EResourceOperator::And, - { TEvQuota::TResourceLeaf(TopicWriteQuoterPath, TopicWriteQuotaResourcePath, dataSize) }, - TDuration::Max()), - 0, - cookie); -} - -bool TPartition::WaitingForPreviousBlobQuota() const { - return TopicQuotaRequestCookie != 0; -} - -bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize) const { - return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize(); -} - -void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) { - // Request quota and write blob. - // Mirrored topics are not quoted in local dc. - const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty(); - if (size_t quotaRequestSize = skip ? 0 : GetQuotaRequestSize(*request)) { - // Request with data. We should check before attempting to write data whether we have enough quota. - Y_VERIFY(!WaitingForPreviousBlobQuota()); - - TopicQuotaRequestCookie = NextTopicWriteQuotaRequestCookie++; - RequestQuotaForWriteBlobRequest(quotaRequestSize, TopicQuotaRequestCookie); - } - - AddMetaKey(request.Get()); - - WriteStartTime = TActivationContext::Now(); - // Write blob -#if 1 - // PQ -> CacheProxy -> KV - Send(BlobCache, request.Release()); -#else - Send(Tablet, request.Release()); -#endif -} - void TPartition::CreateMirrorerActor() { Mirrorer = MakeHolder<TMirrorerInfo>( Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 9d28fd41a13..37b11d69a8c 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -104,7 +104,7 @@ private: void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); void AnswerCurrentWrites(const TActorContext& ctx); void CancelAllWritesOnIdle(const TActorContext& ctx); - void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode); + void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST); void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request); void CreateMirrorerActor(); void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index c10c421581d..185f9f18134 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -6,6 +6,8 @@ namespace NKikimr::NPQ { static const ui32 LEVEL0 = 32; bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev); +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); // @@ -939,4 +941,44 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) { return !diskIsOk; } +static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, + TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + TKeyPrefix from(c, partition); + if (!key.empty()) { + Y_VERIFY(key.StartsWith(TStringBuf(from.Data(), from.Size()))); + from.Clear(); + from.Append(key.data(), key.size()); + } + range->SetFrom(from.Data(), from.Size()); + + TKeyPrefix to(c, partition + 1); + range->SetTo(to.Data(), to.Size()); + + if(includeData) + read->SetIncludeData(true); + + if (dropTmp) { + auto del = request->Record.AddCmdDeleteRange(); + auto range = del->MutableRange(); + TKeyPrefix from(TKeyPrefix::TypeTmpData, partition); + range->SetFrom(from.Data(), from.Size()); + + TKeyPrefix to(TKeyPrefix::TypeTmpData, partition + 1); + range->SetTo(to.Data(), to.Size()); + } + + ctx.Send(dst, request.Release()); +} + +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { + RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == ""); +} + +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { + RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_util.h b/ydb/core/persqueue/partition_util.h index e19e75434da..e9526f81f00 100644 --- a/ydb/core/persqueue/partition_util.h +++ b/ydb/core/persqueue/partition_util.h @@ -108,7 +108,6 @@ private: }; void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels); -void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); -void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); +NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i); } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp new file mode 100644 index 00000000000..b74d1a61328 --- /dev/null +++ b/ydb/core/persqueue/partition_write.cpp @@ -0,0 +1,1422 @@ +#include "event_helpers.h" +#include "mirrorer.h" +#include "partition_util.h" +#include "partition.h" +#include "read.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/blobstorage.h> +#include <ydb/core/base/counters.h> +#include <ydb/core/base/path.h> +#include <ydb/core/base/quoter.h> +#include <ydb/core/protos/counters_pq.pb.h> +#include <ydb/core/protos/msgbus.pb.h> +#include <ydb/library/persqueue/topic_parser/topic_parser.h> +#include <ydb/public/lib/base/msgbus.h> +#include <library/cpp/html/pcdata/pcdata.h> +#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/time_provider/time_provider.h> +#include <util/folder/path.h> +#include <util/string/escape.h> +#include <util/system/byteorder.h> + +namespace NKikimr::NPQ { + +static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; +static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; +static const ui32 MAX_INLINE_SIZE = 1000; + +void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { + THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); + NKikimrClient::TResponse& resp = response->Response; + resp.SetStatus(NMsgBusProxy::MSTATUS_OK); + resp.SetErrorCode(NPersQueue::NErrorCode::OK); + resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie); + ctx.Send(Tablet, response.Release()); +} + +void TPartition::ReplyWrite( + const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, + const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, + const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) { + Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); + Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo); + + THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); + NKikimrClient::TResponse& resp = response->Response; + resp.SetStatus(NMsgBusProxy::MSTATUS_OK); + resp.SetErrorCode(NPersQueue::NErrorCode::OK); + auto write = resp.MutablePartitionResponse()->AddCmdWriteResult(); + write->SetSourceId(sourceId); + write->SetSeqNo(seqNo); + write->SetWriteTimestampMS(writeTimestamp.MilliSeconds()); + if (totalParts > 1) + write->SetPartNo(partNo); + write->SetAlreadyWritten(already); + if (already) + write->SetMaxSeqNo(maxSeqNo); + write->SetOffset(offset); + + write->SetPartitionQuotedTimeMs(partitionQuotedTime.MilliSeconds()); + write->SetTopicQuotedTimeMs(topicQuotedTime.MilliSeconds()); + write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds()); + write->SetWriteTimeMs(writeTime.MilliSeconds()); + + ctx.Send(Tablet, response.Release()); +} + +void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { + UpdateAvailableSize(ctx); + HandleWrites(ctx); +} + +void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { + UpdateAvailableSize(ctx); +} + +void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { + for (const auto& w : Requests) { + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full"); + if (w.IsWrite()) { + const auto& msg = w.GetWrite().Msg; + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + WriteInflightSize -= msg.Data.size(); + } + } + + UpdateWriteBufferIsFullState(ctx.Now()); + Requests.clear(); + Y_VERIFY(Responses.empty()); + + WriteCycleSize = 0; + + ProcessReserveRequests(ctx); +} + +void TPartition::FailBadClient(const TActorContext& ctx) { + for (auto it = Owners.begin(); it != Owners.end();) { + it = DropOwner(it, ctx); + } + Y_VERIFY(Owners.empty()); + Y_VERIFY(ReservedSize == 0); + + for (const auto& w : Requests) { + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); + if (w.IsWrite()) { + const auto& msg = w.GetWrite().Msg; + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + WriteInflightSize -= msg.Data.size(); + } + } + UpdateWriteBufferIsFullState(ctx.Now()); + Requests.clear(); + for (const auto& w : Responses) { + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); + if (w.IsWrite()) + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + } + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize); + Responses.clear(); + + ProcessChangeOwnerRequests(ctx); + ProcessReserveRequests(ctx); +} + +void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) { + + auto &owner = ev->Owner; + auto it = Owners.find(owner); + if (it == Owners.end()) { + Owners[owner]; + it = Owners.find(owner); + } + if (it->second.NeedResetOwner || ev->Force) { //change owner + Y_VERIFY(ReservedSize >= it->second.ReservedSize); + ReservedSize -= it->second.ReservedSize; + + it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie + //cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests' + EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + UpdateWriteBufferIsFullState(ctx.Now()); + ProcessReserveRequests(ctx); + } else { + it->second.WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release())); + } +} + + +THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { + Y_VERIFY(ReservedSize >= it->second.ReservedSize); + ReservedSize -= it->second.ReservedSize; + UpdateWriteBufferIsFullState(ctx.Now()); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + for (auto& ev : it->second.WaitToChangeOwner) { //this request maybe could be done right now + WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release())); + } + auto jt = it; + ++jt; + Owners.erase(it); + return jt; +} + +void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) { + bool res = OwnerPipes.insert(ev->Get()->PipeClient).second; + Y_VERIFY(res); + WaitToChangeOwner.push_back(ev->Release()); + ProcessChangeOwnerRequests(ctx); +} + +void TPartition::ProcessReserveRequests(const TActorContext& ctx) { + const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize(); + + while (!ReserveRequests.empty()) { + const TString& ownerCookie = ReserveRequests.front()->OwnerCookie; + const TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); + const ui64& size = ReserveRequests.front()->Size; + const ui64& cookie = ReserveRequests.front()->Cookie; + const bool& lastRequest = ReserveRequests.front()->LastRequest; + + auto it = Owners.find(owner); + if (it == Owners.end() || it->second.OwnerCookie != ownerCookie) { + ReplyError(ctx, cookie, NPersQueue::NErrorCode::BAD_REQUEST, "ReserveRequest from dead ownership session"); + ReserveRequests.pop_front(); + continue; + } + + const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize; + if (currentSize != 0 && currentSize + size > maxWriteInflightSize) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched"); + break; + } + + if (WaitingForSubDomainQuota(ctx, currentSize)) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace"); + break; + } + + it->second.AddReserveRequest(size, lastRequest); + ReservedSize += size; + + ReplyOk(ctx, cookie); + + ReserveRequests.pop_front(); + } + UpdateWriteBufferIsFullState(ctx.Now()); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); +} + +void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) { + WriteBufferIsFullCounter.UpdateWorkingTime(now); + WriteBufferIsFullCounter.UpdateState(ReservedSize + WriteInflightSize + WriteCycleSize >= Config.GetPartitionConfig().GetBorderWriteInflightSize()); +} + + + +void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) { + const TString& ownerCookie = ev->Get()->OwnerCookie; + TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); + const ui64& messageNo = ev->Get()->MessageNo; + + auto it = Owners.find(owner); + if (it == Owners.end() || it->second.OwnerCookie != ownerCookie) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, "ReserveRequest from dead ownership session"); + return; + } + + if (messageNo != it->second.NextMessageNo) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "reorder in reserve requests, waiting " << it->second.NextMessageNo << ", but got " << messageNo); + DropOwner(it, ctx); + ProcessChangeOwnerRequests(ctx); + return; + } + + ++it->second.NextMessageNo; + ReserveRequests.push_back(ev->Release()); + ProcessReserveRequests(ctx); +} + +void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { + HandleOnWrite(ev, ctx); + HandleWrites(ctx); +} + +void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { + ui64 offset = EndOffset; + while (!Responses.empty()) { + const auto& response = Responses.front(); + + const TDuration quotedTime = response.QuotedTime; + const TDuration queueTime = response.QueueTime; + const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline; + + if (response.IsWrite()) { + const auto& writeResponse = response.GetWrite(); + const TString& s = writeResponse.Msg.SourceId; + const ui64& seqNo = writeResponse.Msg.SeqNo; + const ui16& partNo = writeResponse.Msg.PartNo; + const ui16& totalParts = writeResponse.Msg.TotalParts; + const TMaybe<ui64>& wrOffset = writeResponse.Offset; + + bool already = false; + + auto it = SourceIdStorage.GetInMemorySourceIds().find(s); + + ui64 maxSeqNo = 0; + ui64 maxOffset = 0; + + if (it != SourceIdStorage.GetInMemorySourceIds().end()) { + maxSeqNo = it->second.SeqNo; + maxOffset = it->second.Offset; + if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { + already = true; + } + } + + if (!already) { + if (wrOffset) { + Y_VERIFY(*wrOffset >= offset); + offset = *wrOffset; + } + } + if (!already && partNo + 1 == totalParts) { + if (it == SourceIdStorage.GetInMemorySourceIds().end()) { + TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); + SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp); + } else { + SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp)); + } + + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); + } + ReplyWrite( + ctx, writeResponse.Cookie, s, seqNo, partNo, totalParts, + already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo, + quotedTime, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime + ); + LOG_DEBUG_S( + ctx, + NKikimrServices::PERSQUEUE, + "Answering for message sourceid: '" << EscapeC(s) << + "', Topic: '" << TopicConverter->GetClientsideName() << + "', Partition: " << Partition << + ", SeqNo: " << seqNo << ", partNo: " << partNo << + ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") + ); + if (PartitionWriteQuotaWaitCounter) { + PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds()); + } + + if (!already && partNo + 1 == totalParts) + ++offset; + } else if (response.IsOwnership()) { + const TString& ownerCookie = response.GetOwnership().OwnerCookie; + auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie)); + if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) { + ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie); + } else { + ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); + } + } else if (response.IsRegisterMessageGroup()) { + const auto& body = response.GetRegisterMessageGroup().Body; + + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } + + Y_VERIFY(body.AssignedOffset); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); + ReplyOk(ctx, response.GetCookie()); + } else if (response.IsDeregisterMessageGroup()) { + const auto& body = response.GetDeregisterMessageGroup().Body; + + SourceIdStorage.DeregisterSourceId(body.SourceId); + ReplyOk(ctx, response.GetCookie()); + } else if (response.IsSplitMessageGroup()) { + const auto& split = response.GetSplitMessageGroup(); + + for (const auto& body : split.Deregistrations) { + SourceIdStorage.DeregisterSourceId(body.SourceId); + } + + for (const auto& body : split.Registrations) { + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } + + Y_VERIFY(body.AssignedOffset); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); + } + + ReplyOk(ctx, response.GetCookie()); + } else { + Y_FAIL("Unexpected message"); + } + Responses.pop_front(); + } + TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero(); +} + +void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { + if (!CompactedKeys.empty()) + HeadKeys.clear(); + + if (NewHeadKey.Size > 0) { + while (!HeadKeys.empty() && + (HeadKeys.back().Key.GetOffset() > NewHeadKey.Key.GetOffset() || HeadKeys.back().Key.GetOffset() == NewHeadKey.Key.GetOffset() + && HeadKeys.back().Key.GetPartNo() >= NewHeadKey.Key.GetPartNo())) { + HeadKeys.pop_back(); + } + HeadKeys.push_back(NewHeadKey); + NewHeadKey = TDataKey{TKey{}, 0, TInstant::Zero(), 0}; + } + + if (CompactedKeys.empty() && NewHead.PackedSize == 0) { //Nothing writed at all + return; + } + + Y_VERIFY(EndOffset == Head.GetNextOffset()); + + if (!CompactedKeys.empty() || Head.PackedSize == 0) { //has compactedkeys or head is already empty + Head.PackedSize = 0; + Head.Offset = NewHead.Offset; + Head.PartNo = NewHead.PartNo; //no partNo at this point + Head.Batches.clear(); + } + + while (!CompactedKeys.empty()) { + const auto& ck = CompactedKeys.front(); + BodySize += ck.second; + Y_VERIFY(!ck.first.IsHead()); + ui64 lastOffset = DataKeysBody.empty() ? 0 : (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()); + Y_VERIFY(lastOffset <= ck.first.GetOffset()); + if (DataKeysBody.empty()) { + StartOffset = ck.first.GetOffset() + (ck.first.GetPartNo() > 0 ? 1 : 0); + } else { + if (lastOffset < ck.first.GetOffset()) { + GapOffsets.push_back(std::make_pair(lastOffset, ck.first.GetOffset())); + GapSize += ck.first.GetOffset() - lastOffset; + } + } + DataKeysBody.push_back({ck.first, ck.second, ctx.Now(), DataKeysBody.empty() ? 0 : DataKeysBody.back().CumulativeSize + DataKeysBody.back().Size}); + + CompactedKeys.pop_front(); + } // head cleared, all data moved to body + + //append Head with newHead + while (!NewHead.Batches.empty()) { + Head.Batches.push_back(NewHead.Batches.front()); + NewHead.Batches.pop_front(); + } + Head.PackedSize += NewHead.PackedSize; + + if (Head.PackedSize > 0 && DataKeysBody.empty()) { + StartOffset = Head.Offset + (Head.PartNo > 0 ? 1 : 0); + } + + EndOffset = Head.GetNextOffset(); + NewHead.Clear(); + NewHead.Offset = EndOffset; + + CheckHeadConsistency(); + + UpdateUserInfoEndOffset(ctx.Now()); +} + +void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) { + HandleWriteResponse(ctx); +} + +void TPartition::HandleWriteResponse(const TActorContext& ctx) { + + Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite); + ui64 prevEndOffset = EndOffset; + + ui32 totalLatencyMs = (ctx.Now() - WriteCycleStartTime).MilliSeconds(); + ui32 writeLatencyMs = (ctx.Now() - WriteStartTime).MilliSeconds(); + + WriteLatency.IncFor(writeLatencyMs, 1); + if (writeLatencyMs >= AppData(ctx)->PQConfig.GetWriteLatencyBigMs()) { + SLIBigLatency.Inc(); + } + + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize); + TabletCounters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize); + TabletCounters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize); + if (BytesWrittenGrpc) + BytesWrittenGrpc.Inc(WriteNewSizeInternal); + if (BytesWrittenTotal) + BytesWrittenTotal.Inc(WriteNewSize); + + if (BytesWrittenUncompressed) + BytesWrittenUncompressed.Inc(WriteNewSizeUncompressed); + if (BytesWrittenComp) + BytesWrittenComp.Inc(WriteCycleSize); + if (MsgsWrittenGrpc) + MsgsWrittenGrpc.Inc(WriteNewMessagesInternal); + if (MsgsWrittenTotal) + MsgsWrittenTotal.Inc(WriteNewMessages); + + //All ok + auto now = ctx.Now(); + const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig(); + if (quotingConfig.GetTopicWriteQuotaEntityToLimit() == NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) { + WriteQuota->Exaust(WriteNewSize, now); + } else { + WriteQuota->Exaust(WriteCycleSize, now); + } + for (auto& avg : AvgWriteBytes) { + avg.Update(WriteNewSize, now); + } + for (auto& avg : AvgQuotaBytes) { + avg.Update(WriteNewSize, now); + } + + WriteCycleSize = 0; + WriteNewSize = 0; + WriteNewSizeInternal = 0; + WriteNewSizeUncompressed = 0; + WriteNewMessages = 0; + WriteNewMessagesInternal = 0; + UpdateWriteBufferIsFullState(now); + + AnswerCurrentWrites(ctx); + SyncMemoryStateWithKVState(ctx); + + //if EndOffset changed there could be subscriptions witch could be completed + TVector<std::pair<TReadInfo, ui64>> reads = Subscriber.GetReads(EndOffset); + for (auto& read : reads) { + Y_VERIFY(EndOffset > read.first.Offset); + ProcessRead(ctx, std::move(read.first), read.second, true); + } + //same for read requests + ProcessHasDataRequests(ctx); + + ProcessTimestampsForNewData(prevEndOffset, ctx); + + HandleWrites(ctx); +} + +void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { + ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){ + return sum + msg.Data.size(); + }); + + bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom(); + + if (mirroredPartition && !ev->Get()->OwnerCookie.empty()) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "Write to mirrored topic is forbiden "); + return; + } + + ui64 decReservedSize = 0; + TStringBuf owner; + + if (!mirroredPartition && !ev->Get()->IsDirectWrite) { + owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie); + auto it = Owners.find(owner); + + if (it == Owners.end() || it->second.NeedResetOwner) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, + TStringBuilder() << "new GetOwnership request needed for owner " << owner); + return; + } + + if (it->second.SourceIdDeleted) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, + TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. " + << "Current count of sourceIds is " << SourceIdStorage.GetInMemorySourceIds().size() << " and limit is " << Config.GetPartitionConfig().GetSourceIdMaxCounts() + << ", current minimum sourceid timestamp(Ms) is " << SourceIdStorage.MinAvailableTimestamp(ctx.Now()).MilliSeconds() + << " and border timestamp(Ms) is " << ((ctx.Now() - TInstant::Seconds(Config.GetPartitionConfig().GetSourceIdLifetimeSeconds())).MilliSeconds())); + return; + } + + if (it->second.OwnerCookie != ev->Get()->OwnerCookie) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, + TStringBuilder() << "incorrect ownerCookie " << ev->Get()->OwnerCookie << ", must be " << it->second.OwnerCookie); + return; + } + + if (ev->Get()->MessageNo != it->second.NextMessageNo) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "reorder in requests, waiting " << it->second.NextMessageNo << ", but got " << ev->Get()->MessageNo); + DropOwner(it, ctx); + return; + } + + ++it->second.NextMessageNo; + decReservedSize = it->second.DecReservedSize(); + } + + ReservedSize -= decReservedSize; + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + + TMaybe<ui64> offset = ev->Get()->Offset; + + if (WriteInflightSize > Config.GetPartitionConfig().GetMaxWriteInflightSize()) { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); + + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD, + TStringBuilder() << "try later. Write inflight limit reached. " + << WriteInflightSize << " vs. maximum " << Config.GetPartitionConfig().GetMaxWriteInflightSize()); + return; + } + for (const auto& msg: ev->Get()->Msgs) { + //this is checked in pq_impl when forming EvWrite request + Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite); + Y_VERIFY(!msg.Data.empty()); + + if (msg.SeqNo > (ui64)Max<i64>()) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Request to write wrong SeqNo. Partition " + << Partition << " sourceId '" << EscapeC(msg.SourceId) << "' seqno " << msg.SeqNo); + + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "wrong SeqNo " << msg.SeqNo); + return; + } + + ui32 sz = msg.Data.size() + msg.SourceId.size() + TClientBlob::OVERHEAD; + + if (sz > MAX_BLOB_PART_SIZE) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "too big message " << sz << " vs. maximum " << MAX_BLOB_PART_SIZE); + return; + } + + if (!mirroredPartition) { + SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner); + } + } + + const ui64 maxSize = Config.GetPartitionConfig().GetMaxSizeInPartition(); + const ui64 maxCount = Config.GetPartitionConfig().GetMaxCountInPartition(); + if (EndOffset - StartOffset >= maxCount || Size() >= maxSize) { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); + + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL, + Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, Size(), maxSize)); + return; + } + ui64 size = 0; + for (auto& msg: ev->Get()->Msgs) { + size += msg.Data.size(); + bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts; + EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx); + if (offset && needToChangeOffset) + ++*offset; + } + WriteInflightSize += size; + + // TODO: remove decReservedSize == 0 + Y_VERIFY(size <= decReservedSize || decReservedSize == 0); + UpdateWriteBufferIsFullState(ctx.Now()); +} + +void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + HandleOnWrite(ev, ctx); + HandleWrites(ctx); +} + +void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + const auto& body = ev->Get()->Body; + + auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); + if (it != SourceIdStorage.GetInMemorySourceIds().end()) { + if (!it->second.Explicit) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + "Trying to register implicitly registered SourceId"); + } + + switch (it->second.State) { + case TSourceIdInfo::EState::Registered: + return ReplyOk(ctx, ev->Get()->Cookie); + case TSourceIdInfo::EState::PendingRegistration: + if (!body.AfterSplit) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + "AfterSplit must be set"); + } + break; + default: + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::ERROR, + TStringBuilder() << "Unknown state: " << static_cast<ui32>(it->second.State)); + } + } else if (body.AfterSplit) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + "SourceId not found, registration cannot be completed"); + } + + EmplaceRequest(TRegisterMessageGroupMsg(*ev->Get()), ctx); +} + +void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + HandleOnWrite(ev, ctx); + HandleWrites(ctx); +} + +void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + const auto& body = ev->Get()->Body; + + auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); + if (it == SourceIdStorage.GetInMemorySourceIds().end()) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, + "SourceId doesn't exist"); + } + + EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx); +} + +void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { + HandleOnWrite(ev, ctx); + HandleWrites(ctx); +} + +void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { + if (ev->Get()->Deregistrations.size() > 1) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + TStringBuilder() << "Currently, single deregistrations are supported"); + } + + TSplitMessageGroupMsg msg(ev->Get()->Cookie); + + for (auto& body : ev->Get()->Deregistrations) { + auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); + if (it != SourceIdStorage.GetInMemorySourceIds().end()) { + msg.Deregistrations.push_back(std::move(body)); + } else { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, + "SourceId doesn't exist"); + } + } + + for (auto& body : ev->Get()->Registrations) { + auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); + if (it == SourceIdStorage.GetInMemorySourceIds().end()) { + msg.Registrations.push_back(std::move(body)); + } else { + if (!it->second.Explicit) { + return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, + "Trying to register implicitly registered SourceId"); + } + } + } + + EmplaceRequest(std::move(msg), ctx); +} + +std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) { + std::pair<TKey, ui32> res({key, size}); + ui32 x = headCleared ? 0 : Head.PackedSize; + Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u, [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == NewHead.PackedSize + x); + for (auto it = DataKeysHead.rbegin(); it != DataKeysHead.rend(); ++it) { + auto jt = it; ++jt; + if (it->NeedCompaction()) { + res = it->Compact(); + if (jt != DataKeysHead.rend()) { + jt->AddKey(res.first, res.second); + } + } else { + Y_VERIFY(jt == DataKeysHead.rend() || !jt->NeedCompaction()); //compact must start from last level, not internal + } + Y_VERIFY(!it->NeedCompaction()); + } + Y_VERIFY(res.second >= size); + Y_VERIFY(res.first.GetOffset() < key.GetOffset() || res.first.GetOffset() == key.GetOffset() && res.first.GetPartNo() <= key.GetPartNo()); + return res; +} + + +void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { + while (!WaitToChangeOwner.empty()) { + auto &ev = WaitToChangeOwner.front(); + if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe + ProcessChangeOwnerRequest(ev.Release(), ctx); + } else { + ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::ERROR, "Pipe for GetOwnershipRequest is already dead"); + } + WaitToChangeOwner.pop_front(); + } + if (CurrentStateFunc() == &TThis::StateIdle) { + HandleWrites(ctx); + } +} + +void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode) { + ReplyError(ctx, p.Cookie, errorCode, errorStr); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); + FailBadClient(ctx); + NewHead.Clear(); + NewHead.Offset = EndOffset; + sourceIdWriter.Clear(); + request->Record.Clear(); + PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); + CompactedKeys.clear(); + + WriteCycleSize = 0; +} + +bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, + TSourceIdWriter& sourceIdWriter) { + + ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; + + WriteCycleSize = 0; + WriteNewSize = 0; + WriteNewSizeUncompressed = 0; + WriteNewMessages = 0; + UpdateWriteBufferIsFullState(ctx.Now()); + CurrentTimestamp = ctx.Now(); + + NewHead.Offset = EndOffset; + NewHead.PartNo = 0; + NewHead.PackedSize = 0; + + Y_VERIFY(NewHead.Batches.empty()); + + bool oldPartsCleared = false; + bool headCleared = (Head.PackedSize == 0); + + + //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too) + //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs + //otherwise write this batch as is to head; + + while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big + auto pp = Requests.front(); + Requests.pop_front(); + + if (!pp.IsWrite()) { + if (pp.IsRegisterMessageGroup()) { + auto& body = pp.GetRegisterMessageGroup().Body; + + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } + + body.AssignedOffset = curOffset; + sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange)); + } else if (pp.IsDeregisterMessageGroup()) { + sourceIdWriter.DeregisterSourceId(pp.GetDeregisterMessageGroup().Body.SourceId); + } else if (pp.IsSplitMessageGroup()) { + for (auto& body : pp.GetSplitMessageGroup().Deregistrations) { + sourceIdWriter.DeregisterSourceId(body.SourceId); + } + + for (auto& body : pp.GetSplitMessageGroup().Registrations) { + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } + + body.AssignedOffset = curOffset; + sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange), true); + } + } else { + Y_VERIFY(pp.IsOwnership()); + } + + EmplaceResponse(std::move(pp), ctx); + continue; + } + + Y_VERIFY(pp.IsWrite()); + auto& p = pp.GetWrite(); + + WriteInflightSize -= p.Msg.Data.size(); + + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp); + //check already written + + ui64 poffset = p.Offset ? *p.Offset : curOffset; + + auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId); + auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId); + if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) { + bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end()); + bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end()); + + if (poffset >= curOffset) { + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Already written message. Topic: '" << TopicConverter->GetClientsideName() + << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) + << "'. Message seqNo = " << p.Msg.SeqNo + << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) + << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) + << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset + ); + + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); + } else { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); + } + + TString().swap(p.Msg.Data); + EmplaceResponse(std::move(pp), ctx); + continue; + } + + if (poffset < curOffset) { //too small offset + CancelAllWritesOnWrite(ctx, request, + TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo + << " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset, + p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); + return false; + } + + Y_VERIFY(poffset >= curOffset); + + bool needCompactHead = poffset > curOffset; + if (needCompactHead) { //got gap + if (p.Msg.PartNo != 0) { //gap can't be inside of partitioned message + CancelAllWritesOnWrite(ctx, request, + TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo + << " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset " + << poffset << ", must be " << curOffset, + p, sourceIdWriter); + return false; + } + curOffset = poffset; + } + + if (p.Msg.PartNo == 0) { //create new PartitionedBlob + //there could be parts from previous owner, clear them + if (!oldPartsCleared) { + oldPartsCleared = true; + auto del = request->Record.AddCmdDeleteRange(); + auto range = del->MutableRange(); + TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition); + range->SetFrom(from.Data(), from.Size()); + TKeyPrefix to(TKeyPrefix::TypeTmpData, Partition + 1); + range->SetTo(to.Data(), to.Size()); + } + + if (PartitionedBlob.HasFormedBlobs()) { + //clear currently-writed blobs + auto oldCmdWrite = request->Record.GetCmdWrite(); + request->Record.ClearCmdWrite(); + for (ui32 i = 0; i < (ui32)oldCmdWrite.size(); ++i) { + TKey key(oldCmdWrite.Get(i).GetKey()); + if (key.GetType() != TKeyPrefix::TypeTmpData) { + request->Record.AddCmdWrite()->CopyFrom(oldCmdWrite.Get(i)); + } + } + } + PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo, + p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead, + headCleared, needCompactHead, MaxBlobSize); + } + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + << " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) << + "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo + ); + TString s; + if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) { + //this must not be happen - client sends gaps, fail this client till the end + CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter); + //now no changes will leak + return false; + } + + WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size(); + WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size()); + WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size(); + if (p.Msg.PartNo == 0) { + ++WriteNewMessages; + if (!p.Msg.External) + ++WriteNewMessagesInternal; + } + + TMaybe<TPartData> partData; + if (p.Msg.TotalParts > 1) { //this is multi-part message + partData = TPartData(p.Msg.PartNo, p.Msg.TotalParts, p.Msg.TotalSize); + } + WriteTimestamp = ctx.Now(); + WriteTimestampEstimate = p.Msg.WriteTimestamp > 0 ? TInstant::MilliSeconds(p.Msg.WriteTimestamp) : WriteTimestamp; + TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.Data, std::move(partData), WriteTimestampEstimate, + TInstant::MilliSeconds(p.Msg.CreateTimestamp == 0 ? curOffset : p.Msg.CreateTimestamp), + p.Msg.UncompressedSize, p.Msg.PartitionKey, p.Msg.ExplicitHashKey); //remove curOffset when LB will report CTime + + const ui64 writeLagMs = + (WriteTimestamp - TInstant::MilliSeconds(p.Msg.CreateTimestamp)).MilliSeconds(); + WriteLagMs.Update(writeLagMs, WriteTimestamp); + if (InputTimeLag) { + InputTimeLag->IncFor(writeLagMs, 1); + if (p.Msg.PartNo == 0) { + MessageSize->IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1); + } + } + + bool lastBlobPart = blob.IsLastPart(); + + //will return compacted tmp blob + std::pair<TKey, TString> newWrite = PartitionedBlob.Add(std::move(blob)); + + if (!newWrite.second.empty()) { + auto write = request->Record.AddCmdWrite(); + write->SetKey(newWrite.first.Data(), newWrite.first.Size()); + write->SetValue(newWrite.second); + Y_VERIFY(!newWrite.first.IsHead()); + auto channel = GetChannel(NextChannel(newWrite.first.IsHead(), newWrite.second.Size())); + write->SetStorageChannel(channel); + write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + + TKey resKey = newWrite.first; + resKey.SetType(TKeyPrefix::TypeData); + write->SetKeyToCache(resKey.Data(), resKey.Size()); + WriteCycleSize += newWrite.second.size(); + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicConverter->GetClientsideName() << + "' partition " << Partition << + " part blob sourceId '" << EscapeC(p.Msg.SourceId) << + "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << + " result is " << TStringBuf(newWrite.first.Data(), newWrite.first.Size()) << + " size " << newWrite.second.size() + ); + } + + if (lastBlobPart) { + Y_VERIFY(PartitionedBlob.IsComplete()); + ui32 curWrites = 0; + for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs + TKey key(request->Record.GetCmdWrite(i).GetKey()); + if (key.GetType() == TKeyPrefix::TypeTmpData) { + key.SetType(TKeyPrefix::TypeData); + request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size())); + ++curWrites; + } + } + Y_VERIFY(curWrites <= PartitionedBlob.GetFormedBlobs().size()); + auto formedBlobs = PartitionedBlob.GetFormedBlobs(); + for (ui32 i = 0; i < formedBlobs.size(); ++i) { + const auto& x = formedBlobs[i]; + if (i + curWrites < formedBlobs.size()) { //this KV pair is already writed, rename needed + auto rename = request->Record.AddCmdRename(); + TKey key = x.first; + rename->SetOldKey(TString(key.Data(), key.Size())); + key.SetType(TKeyPrefix::TypeData); + rename->SetNewKey(TString(key.Data(), key.Size())); + } + if (!DataKeysBody.empty() && CompactedKeys.empty()) { + Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.first.GetOffset(), + "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, x.first.ToString().c_str()); + } + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + << " " << x.first.ToString() << " size " << x.second << " WTime " << ctx.Now().MilliSeconds() + ); + + CompactedKeys.push_back(x); + CompactedKeys.back().first.SetType(TKeyPrefix::TypeData); + } + if (PartitionedBlob.HasFormedBlobs()) { //Head and newHead are cleared + headCleared = true; + NewHead.Clear(); + NewHead.Offset = PartitionedBlob.GetOffset(); + NewHead.PartNo = PartitionedBlob.GetHeadPartNo(); + NewHead.PackedSize = 0; + } + ui32 countOfLastParts = 0; + for (auto& x : PartitionedBlob.GetClientBlobs()) { + if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) { + NewHead.Batches.emplace_back(curOffset, x.GetPartNo(), TVector<TClientBlob>()); + NewHead.PackedSize += GetMaxHeaderSize(); //upper bound for packed size + } + if (x.IsLastPart()) { + ++countOfLastParts; + } + Y_VERIFY(!NewHead.Batches.back().Packed); + NewHead.Batches.back().AddBlob(x); + NewHead.PackedSize += x.GetBlobSize(); + if (NewHead.Batches.back().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) { + NewHead.Batches.back().Pack(); + NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob + + NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound + NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize(); + } + } + + Y_VERIFY(countOfLastParts == 1); + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + << " part blob complete sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo + << " partNo " << p.Msg.PartNo << " FormedBlobsCount " << PartitionedBlob.GetFormedBlobs().size() + << " NewHead: " << NewHead + ); + + if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) { + sourceIdWriter.RegisterSourceId(p.Msg.SourceId, p.Msg.SeqNo, curOffset, CurrentTimestamp); + } else { + sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(p.Msg.SeqNo, curOffset, CurrentTimestamp)); + } + + ++curOffset; + PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); + } + TString().swap(p.Msg.Data); + EmplaceResponse(std::move(pp), ctx); + } + + UpdateWriteBufferIsFullState(ctx.Now()); + + if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) { + NewHead.Batches.back().Pack(); + NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob + + NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound + NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize(); + } + + Y_VERIFY((headCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl + MaxWriteResponsesSize = Max<ui32>(MaxWriteResponsesSize, Responses.size()); + + return headCleared; +} + + +std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) { + bool needCompaction = false; + ui32 HeadSize = headCleared ? 0 : Head.PackedSize; + if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize + >= Min<ui32>(MaxBlobSize, Config.GetPartitionConfig().GetLowWatermark())) + needCompaction = true; + + if (PartitionedBlob.IsInited()) { //has active partitioned blob - compaction is forbiden, head and newHead will be compacted when this partitioned blob is finished + needCompaction = false; + } + + Y_VERIFY(NewHead.PackedSize > 0 || needCompaction); //smthing must be here + + TKey key(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount(), !needCompaction); + + if (NewHead.PackedSize > 0) + DataKeysHead[TotalLevels - 1].AddKey(key, NewHead.PackedSize); + Y_VERIFY(HeadSize + NewHead.PackedSize <= 3 * MaxSizeCheck); + + std::pair<TKey, ui32> res; + + if (needCompaction) { //compact all + for (ui32 i = 0; i < TotalLevels; ++i) { + DataKeysHead[i].Clear(); + } + if (!headCleared) { //compacted blob must contain both head and NewHead + key = TKey(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(), + Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount(), false); + } //otherwise KV blob is not from head (!key.IsHead()) and contains only new data from NewHead + res = std::make_pair(key, HeadSize + NewHead.PackedSize); + } else { + res = Compact(key, NewHead.PackedSize, headCleared); + Y_VERIFY(res.first.IsHead());//may compact some KV blobs from head, but new KV blob is from head too + Y_VERIFY(res.second >= NewHead.PackedSize); //at least new data must be writed + } + Y_VERIFY(res.second <= MaxBlobSize); + return res; +} + +void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) { + const auto& key = res.first; + + TString valueD; + valueD.reserve(res.second); + ui32 pp = Head.FindPos(key.GetOffset(), key.GetPartNo()); + if (pp < Max<ui32>() && key.GetOffset() < EndOffset) { //this batch trully contains this offset + Y_VERIFY(pp < Head.Batches.size()); + Y_VERIFY(Head.Batches[pp].GetOffset() == key.GetOffset()); + Y_VERIFY(Head.Batches[pp].GetPartNo() == key.GetPartNo()); + for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here + Y_VERIFY(Head.Batches[pp].Packed); + valueD += Head.Batches[pp].Serialize(); + } + } + for (auto& b : NewHead.Batches) { + Y_VERIFY(b.Packed); + valueD += b.Serialize(); + } + + Y_VERIFY(res.second >= valueD.size()); + + if (res.second > valueD.size() && res.first.IsHead()) { //change to real size if real packed size is smaller + + Y_FAIL("Can't be here right now, only after merging of small batches"); + + for (auto it = DataKeysHead.rbegin(); it != DataKeysHead.rend(); ++it) { + if (it->KeysCount() > 0 ) { + auto res2 = it->PopBack(); + Y_VERIFY(res2 == res); + res2.second = valueD.size(); + + DataKeysHead[TotalLevels - 1].AddKey(res2.first, res2.second); + + res2 = Compact(res2.first, res2.second, headCleared); + + Y_VERIFY(res2.first == res.first); + Y_VERIFY(res2.second == valueD.size()); + res = res2; + break; + } + } + } + + Y_VERIFY(res.second == valueD.size() || res.first.IsHead()); + + CheckBlob(key, valueD); + + auto write = request->Record.AddCmdWrite(); + write->SetKey(key.Data(), key.Size()); + write->SetValue(valueD); + + if (!key.IsHead()) + write->SetKeyToCache(key.Data(), key.Size()); + + bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE; + + if (isInline) + write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); + else { + auto channel = GetChannel(NextChannel(key.IsHead(), valueD.size())); + write->SetStorageChannel(channel); + write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + } + + //Need to clear all compacted blobs + TKey k = CompactedKeys.empty() ? key : CompactedKeys.front().first; + ClearOldHead(k.GetOffset(), k.GetPartNo(), request); + + if (!key.IsHead()) { + if (!DataKeysBody.empty() && CompactedKeys.empty()) { + Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= key.GetOffset(), + "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, key.ToString().c_str()); + } + CompactedKeys.push_back(res); + NewHead.Clear(); + NewHead.Offset = res.first.GetOffset() + res.first.GetCount(); + NewHead.PartNo = 0; + } else { + Y_VERIFY(NewHeadKey.Size == 0); + NewHeadKey = {key, res.second, CurrentTimestamp, 0}; + } + WriteCycleSize += write->GetValue().size(); + UpdateWriteBufferIsFullState(ctx.Now()); +} + +void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { + if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { + + QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); + + ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck()); + } +} + +void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) { + FilterDeadlinedWrites(ctx); +} + +bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) { + + FilterDeadlinedWrites(ctx); + + if (!WriteQuota->CanExaust(now)) { // Waiting for partition quota. + SetDeadlinesForWrites(ctx); + return false; + } + + if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) { // Waiting for topic quota. + SetDeadlinesForWrites(ctx); + + if (StartTopicQuotaWaitTimeForCurrentBlob == TInstant::Zero() && !Requests.empty()) { + StartTopicQuotaWaitTimeForCurrentBlob = now; + } + return false; + } + + QuotaDeadline = TInstant::Zero(); + + if (Requests.empty()) + return false; + + Y_VERIFY(request->Record.CmdWriteSize() == 0); + Y_VERIFY(request->Record.CmdRenameSize() == 0); + Y_VERIFY(request->Record.CmdDeleteRangeSize() == 0); + const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() + ? ESourceIdFormat::Proto + : ESourceIdFormat::Raw; + TSourceIdWriter sourceIdWriter(format); + + bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter); + + if (headCleared) { + Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0); + for (ui32 i = 0; i < TotalLevels; ++i) { + DataKeysHead[i].Clear(); + } + } + + if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed + if (sourceIdWriter.GetSourceIdsToWrite().empty()) { + return request->Record.CmdWriteSize() > 0 + || request->Record.CmdRenameSize() > 0 + || request->Record.CmdDeleteRangeSize() > 0; + } else { + sourceIdWriter.FillRequest(request, Partition); + return true; + } + } + + sourceIdWriter.FillRequest(request, Partition); + + std::pair<TKey, ui32> res = GetNewWriteKey(headCleared); + const auto& key = res.first; + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + << " compactOffset " << key.GetOffset() << "," << key.GetCount() + << " HeadOffset " << Head.Offset << " endOffset " << EndOffset << " curOffset " + << NewHead.GetNextOffset() << " " << key.ToString() + << " size " << res.second << " WTime " << ctx.Now().MilliSeconds() + ); + + AddNewWriteBlob(res, request, headCleared, ctx); + return true; +} + +void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { + if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) + return; + + std::deque<TMessage> newRequests; + for (auto& w : Requests) { + if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) { + newRequests.emplace_back(std::move(w)); + continue; + } + if (w.IsWrite()) { + const auto& msg = w.GetWrite().Msg; + + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + WriteInflightSize -= msg.Data.size(); + } + + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); + } + Requests = std::move(newRequests); + QuotaDeadline = TInstant::Zero(); + + UpdateWriteBufferIsFullState(ctx.Now()); +} + + +void TPartition::HandleWrites(const TActorContext& ctx) { + Become(&TThis::StateWrite); + + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + + Y_VERIFY(Head.PackedSize + NewHead.PackedSize <= 2 * MaxSizeCheck); + + TInstant now = ctx.Now(); + WriteCycleStartTime = now; + + bool haveData = false; + bool haveCheckDisk = false; + + if (!Requests.empty() && DiskIsFull) { + CancelAllWritesOnIdle(ctx); + AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); + haveCheckDisk = true; + } else { + haveData = ProcessWrites(request.Get(), now, ctx); + } + bool haveDrop = CleanUp(request.Get(), haveData, ctx); + + ProcessReserveRequests(ctx); + if (!haveData && !haveDrop && !haveCheckDisk) { //no data writed/deleted + if (!Requests.empty()) { //there could be change ownership requests that + bool res = ProcessWrites(request.Get(), now, ctx); + Y_VERIFY(!res); + } + Y_VERIFY(Requests.empty() || !WriteQuota->CanExaust(now) || WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left + AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all + BecomeIdle(ctx); + return; + } + + WritesTotal.Inc(); + WriteBlobWithQuota(std::move(request)); +} + +void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) { + LOG_DEBUG_S( + TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, + "Send write quota request." << + " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Partition: " << Partition << "." << + " Amount: " << dataSize << "." << + " Cookie: " << cookie + ); + + Send(MakeQuoterServiceID(), + new TEvQuota::TEvRequest( + TEvQuota::EResourceOperator::And, + { TEvQuota::TResourceLeaf(TopicWriteQuoterPath, TopicWriteQuotaResourcePath, dataSize) }, + TDuration::Max()), + 0, + cookie); +} + +bool TPartition::WaitingForPreviousBlobQuota() const { + return TopicQuotaRequestCookie != 0; +} + +bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize) const { + return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize(); +} + +void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) { + // Request quota and write blob. + // Mirrored topics are not quoted in local dc. + const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty(); + if (size_t quotaRequestSize = skip ? 0 : GetQuotaRequestSize(*request)) { + // Request with data. We should check before attempting to write data whether we have enough quota. + Y_VERIFY(!WaitingForPreviousBlobQuota()); + + TopicQuotaRequestCookie = NextTopicWriteQuotaRequestCookie++; + RequestQuotaForWriteBlobRequest(quotaRequestSize, TopicQuotaRequestCookie); + } + + AddMetaKey(request.Get()); + + WriteStartTime = TActivationContext::Now(); + // Write blob +#if 1 + // PQ -> CacheProxy -> KV + Send(BlobCache, request.Release()); +#else + Send(Tablet, request.Release()); +#endif +} + +} // namespace NKikimr::NPQ |