diff options
author | chertus <azuikov@ydb.tech> | 2023-06-01 17:14:34 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-06-01 17:14:34 +0300 |
commit | 0fa4c2d7731ee14f0d45501830d5d66377530e26 (patch) | |
tree | b3b06bdfc2d477858530b6230f9eb9ce2e9ac253 | |
parent | 308587966b506c17c0a15004ee2181bfc4a5cb37 (diff) | |
download | ydb-0fa4c2d7731ee14f0d45501830d5d66377530e26.tar.gz |
better yellow flags check in ColumnShard
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 102 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_private_events.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/compaction_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.h | 32 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/eviction_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/indexing_actor.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/write_actor.cpp | 37 |
11 files changed, 132 insertions, 103 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 096537cc12c..4ec86e165b4 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -228,7 +228,8 @@ struct TEvColumnShard { } }; - struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> { + struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> + , public NColumnShard::TPutStatus { TEvWrite() = default; TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId, @@ -262,13 +263,10 @@ struct TEvColumnShard { return ActorIdFromProto(Record.GetSource()); } - NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; NColumnShard::TUnifiedBlobId BlobId; std::shared_ptr<arrow::RecordBatch> WrittenBatch; NColumnShard::TBlobBatch BlobBatch; NColumnShard::TUsage ResourceUsage; - TVector<ui32> YellowMoveChannels; - TVector<ui32> YellowStopChannels; ui64 MaxSmallBlobSize; }; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 33ee2942e37..0dcb1a12e56 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -44,7 +44,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS; auto& logoBlobId = Ev->Get()->BlobId; - auto putStatus = Ev->Get()->PutStatus; + auto putStatus = Ev->Get()->GetPutStatus(); Y_VERIFY(putStatus == NKikimrProto::OK); Y_VERIFY(logoBlobId.IsValid()); @@ -129,7 +129,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { LastAccessTime = TAppData::TimeProvider->Now(); - OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); + OnYellowChannels(*ev->Get()); auto& record = Proto(ev->Get()); auto& data = record.GetData(); @@ -137,7 +137,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ui64 metaShard = record.GetTxInitiator(); ui64 writeId = record.GetWriteId(); TString dedupId = record.GetDedupId(); - auto putStatus = ev->Get()->PutStatus; + auto putStatus = ev->Get()->GetPutStatus(); bool isWritable = TablesManager.IsWritableTable(tableId); bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable; diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 427259b209a..82754ad0a72 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -58,7 +58,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) << ") changes: " << *changes << " at tablet " << Self->TabletID()); bool ok = false; - if (Ev->Get()->PutStatus == NKikimrProto::OK) { + if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) { NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId); Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot); @@ -246,52 +246,18 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); } - Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes); - - if (changes->IsInsert()) { - Self->ActiveIndexing = false; - - Self->IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL); - Self->IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten); - Self->IncCounter(COUNTER_INDEXING_TIME, Ev->Get()->Duration.MilliSeconds()); - } else if (changes->IsCompaction()) { - Self->ActiveCompaction--; - - Y_VERIFY(changes->CompactionInfo); - bool inGranule = changes->CompactionInfo->InGranule(); - - if (inGranule) { - Self->IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL); - Self->IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten); - } else { - Self->IncCounter(ok ? COUNTER_SPLIT_COMPACTION_SUCCESS : COUNTER_SPLIT_COMPACTION_FAIL); - Self->IncCounter(COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, bytesWritten); - } - Self->IncCounter(COUNTER_COMPACTION_TIME, Ev->Get()->Duration.MilliSeconds()); - } else if (changes->IsCleanup()) { - Self->ActiveCleanup = false; + if (changes->IsCleanup()) { TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Cleanup() : TBackgroundActivity::None(); - Self->BlobManager->GetCleanupBlobs(BlobsToForget); - - Self->IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL); } else if (changes->IsTtl()) { - Self->ActiveTtl = false; //TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Ttl() : TBackgroundActivity::None(); // Do not start new TTL till we evict current PortionsToEvict. We could evict them twice otherwise Y_VERIFY(!Self->ActiveEvictions, "Unexpected active evictions count at tablet %lu", Self->TabletID()); Self->ActiveEvictions = ExportTierBlobs.size(); - - Self->IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL); - Self->IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten); } - Self->UpdateResourceMetrics(ctx, Ev->Get()->ResourceUsage); + Self->FinishWriteIndex(ctx, Ev, ok, blobsWritten, bytesWritten); return true; } @@ -299,7 +265,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxWriteIndex.Complete at tablet " << Self->TabletID()); - if (Ev->Get()->PutStatus == NKikimrProto::TRYLATER) { + if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) { ctx.Schedule(Self->FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true)); } else { Self->EnqueueBackgroundActivities(false, TriggerActivity); @@ -317,18 +283,64 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Self->ForgetBlobs(ctx, BlobsToForget); } +void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWriteIndex::TPtr& ev, + bool ok, ui64 blobsWritten, ui64 bytesWritten) { + auto changes = ev->Get()->IndexChanges; + Y_VERIFY(changes); + + TablesManager.MutablePrimaryIndex().FreeLocks(changes); + + if (changes->IsInsert()) { + ActiveIndexing = false; + + IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL); + IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten); + IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten); + IncCounter(COUNTER_INDEXING_TIME, ev->Get()->Duration.MilliSeconds()); + } else if (changes->IsCompaction()) { + ActiveCompaction--; + + Y_VERIFY(changes->CompactionInfo); + bool inGranule = changes->CompactionInfo->InGranule(); + + if (inGranule) { + IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL); + IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten); + IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten); + } else { + IncCounter(ok ? COUNTER_SPLIT_COMPACTION_SUCCESS : COUNTER_SPLIT_COMPACTION_FAIL); + IncCounter(COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, blobsWritten); + IncCounter(COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, bytesWritten); + } + IncCounter(COUNTER_COMPACTION_TIME, ev->Get()->Duration.MilliSeconds()); + } else if (changes->IsCleanup()) { + ActiveCleanup = false; + + IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL); + } else if (changes->IsTtl()) { + ActiveTtl = false; + + IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL); + IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten); + IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten); + } + + UpdateResourceMetrics(ctx, ev->Get()->ResourceUsage); +} void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { - auto& blobs = ev->Get()->Blobs; + auto putStatus = ev->Get()->GetPutStatus(); - if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) { + if (putStatus == NKikimrProto::UNKNOWN) { if (IsAnyChannelYellowStop()) { LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID()); IncCounter(COUNTER_OUT_OF_SPACE); - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWriteIndex(this, ev), ctx); + ev->Get()->SetPutStatus(NKikimrProto::TRYLATER); + FinishWriteIndex(ctx, ev); + ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true)); } else { + auto& blobs = ev->Get()->Blobs; LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID()); Y_VERIFY(!blobs.empty()); @@ -336,13 +348,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } } else { - if (ev->Get()->PutStatus == NKikimrProto::OK) { - LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID()); + if (putStatus == NKikimrProto::OK) { + LOG_S_DEBUG("WriteIndex at tablet " << TabletID()); } else { LOG_S_INFO("WriteIndex error at tablet " << TabletID()); } - OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); + OnYellowChannels(*ev->Get()); Execute(new TTxWriteIndex(this, ev), ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 0cdc3955fbe..48677da1f90 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -899,7 +899,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { #endif auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false); - ev->PutStatus = NKikimrProto::OK; // No new blobs to write + ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write ActiveCleanup = true; return ev; diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 0f91676649c..e83d109f13d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -195,10 +195,8 @@ class TColumnShard return Executor()->GetStats().IsAnyChannelYellowMove; } - void OnYellowChannels(TVector<ui32>&& yellowMove, TVector<ui32>&& yellowStop) { - if (yellowMove.size() || yellowStop.size()) { - Executor()->OnYellowChannels(std::move(yellowMove), std::move(yellowStop)); - } + void OnYellowChannels(TPutStatus& putStatus) { + putStatus.OnYellowChannels(Executor()); } void SetCounter(NColumnShard::ESimpleCounters counter, ui64 num) const { @@ -384,7 +382,7 @@ private: const TScanCounters ScanCounters; const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation"); const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); - + THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; @@ -458,6 +456,8 @@ private: void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); + void FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWriteIndex::TPtr& ev, + bool ok = false, ui64 blobsWritten = 0, ui64 bytesWritten = 0); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); TActorId GetS3ActorForTier(const TString& tierId) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId, diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 05a23aca572..eb653f00d81 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -25,8 +25,7 @@ struct TEvPrivate { static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); /// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction. - struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { - NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; + struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex>, public TPutStatus { NOlap::TVersionedIndex IndexInfo; THashMap<ui64, NKikimr::NOlap::TTiering> Tiering; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; @@ -35,8 +34,6 @@ struct TEvPrivate { bool GranuleCompaction{false}; TBlobBatch BlobBatch; TUsage ResourceUsage; - TVector<ui32> YellowMoveChannels; - TVector<ui32> YellowStopChannels; bool CacheData{false}; TDuration Duration; @@ -112,12 +109,12 @@ struct TEvPrivate { } } } else { - TxEvent->PutStatus = NKikimrProto::OK; + TxEvent->SetPutStatus(NKikimrProto::OK); } } bool NeedDataReadWrite() const { - return (TxEvent->PutStatus != NKikimrProto::OK); + return (TxEvent->GetPutStatus() != NKikimrProto::OK); } }; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 65d6f87c876..3915f92e327 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -76,9 +76,10 @@ public: LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (compaction)"); - TxEvent->PutStatus = event.Status; - if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { - TxEvent->PutStatus = NKikimrProto::ERROR; + if (event.Status == NKikimrProto::UNKNOWN) { + TxEvent->SetPutStatus(NKikimrProto::ERROR); + } else { + TxEvent->SetPutStatus(event.Status); } } @@ -135,7 +136,7 @@ private: void CompactGranules(const TActorContext& ctx) { Y_VERIFY(TxEvent); - if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) { + if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) { LOG_S_INFO("Granules compaction not started at tablet " << TabletId); ctx.Send(Parent, TxEvent.release()); return; @@ -151,7 +152,7 @@ private: TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult()); } if (TxEvent->Blobs.empty()) { - TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit + TxEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit } } TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index 30a248242b1..7717ea64a70 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -140,4 +140,36 @@ public: } }; +class TPutStatus { +public: + NKikimrProto::EReplyStatus GetPutStatus() const { + return PutStatus; + } + + void SetPutStatus(NKikimrProto::EReplyStatus status) { + PutStatus = status; + } + + void SetPutStatus(NKikimrProto::EReplyStatus status, + THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) { + PutStatus = status; + YellowMoveChannels = std::move(yellowMoveChannels); + YellowStopChannels = std::move(yellowStopChannels); + } + + template <typename T> + void OnYellowChannels(T* executor) { + if (YellowMoveChannels.size() || YellowStopChannels.size()) { + executor->OnYellowChannels( + TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end()), + TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end())); + } + } + +private: + NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; + THashSet<ui32> YellowMoveChannels; + THashSet<ui32> YellowStopChannels; +}; + } diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index 865fda62be5..5b3f1a5da69 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -67,9 +67,10 @@ public: LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (eviction)"); - TxEvent->PutStatus = event.Status; - if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { - TxEvent->PutStatus = NKikimrProto::ERROR; + if (event.Status == NKikimrProto::UNKNOWN) { + TxEvent->SetPutStatus(NKikimrProto::ERROR); + } else { + TxEvent->SetPutStatus(event.Status); } } @@ -121,7 +122,7 @@ private: void EvictPortions(const TActorContext& ctx) { Y_VERIFY(TxEvent); - if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) { + if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) { LOG_S_INFO("Portions eviction not started at tablet " << TabletId); ctx.Send(Parent, TxEvent.release()); return; @@ -135,7 +136,7 @@ private: NOlap::TEvictionLogic evictionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); TxEvent->Blobs = std::move(evictionLogic.Apply(TxEvent->IndexChanges).DetachResult()); if (TxEvent->Blobs.empty()) { - TxEvent->PutStatus = NKikimrProto::OK; + TxEvent->SetPutStatus(NKikimrProto::OK); } } ui32 blobsSize = TxEvent->Blobs.size(); diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 1527c231103..5d48679f002 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -62,9 +62,10 @@ public: << " at tablet " << TabletId << " (index)"); BlobsToRead.erase(blobId); - TxEvent->PutStatus = event.Status; - if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { - TxEvent->PutStatus = NKikimrProto::ERROR; + if (event.Status == NKikimrProto::UNKNOWN) { + TxEvent->SetPutStatus(NKikimrProto::ERROR); + } else { + TxEvent->SetPutStatus(event.Status); } return; } @@ -125,7 +126,7 @@ private: void Index(const TActorContext& ctx) { Y_VERIFY(TxEvent); - if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { + if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) { LOG_S_DEBUG("Indexing started at tablet " << TabletId); TCpuGuard guard(TxEvent->ResourceUsage); diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index df5f91931ed..7ef9b70e4ba 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -33,8 +33,6 @@ public: Y_VERIFY(!WriteEv || !WriteIndexEv); } - // TODO: CheckYellow - void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) { TEvBlobStorage::TEvPutResult* msg = ev->Get(); auto status = msg->Status; @@ -46,12 +44,10 @@ public: YellowStopChannels.insert(msg->Id.Channel()); } - if (status != NKikimrProto::OK) { LOG_S_ERROR("Unsuccessful TEvPutResult for blob " << msg->Id.ToString() << " status: " << status << " reason: " << msg->ErrorReason); - SendResultAndDie(ctx, status); - return; + return SendResultAndDie(ctx, status); } LOG_S_TRACE("TEvPutResult for blob " << msg->Id.ToString()); @@ -59,7 +55,7 @@ public: BlobBatch.OnBlobWriteResult(ev); if (BlobBatch.AllBlobWritesCompleted()) { - SendResultAndDie(ctx, NKikimrProto::OK); + return SendResultAndDie(ctx, NKikimrProto::OK); } } @@ -86,8 +82,7 @@ public: if (Deadline != TInstant::Max()) { TInstant now = TAppData::TimeProvider->Now(); if (Deadline <= now) { - SendResultAndDie(ctx, NKikimrProto::TIMEOUT); - return; + return SendResultAndDie(ctx, NKikimrProto::TIMEOUT); } const TDuration timeout = Deadline - now; @@ -103,7 +98,7 @@ public: } void SendWriteRequest(const TActorContext& ctx) { - Y_VERIFY(WriteEv->PutStatus == NKikimrProto::UNKNOWN); + Y_VERIFY(WriteEv->GetPutStatus() == NKikimrProto::UNKNOWN); auto& record = Proto(WriteEv.Get()); ui64 pathId = record.GetTableId(); @@ -114,7 +109,7 @@ public: meta = record.GetMeta().GetSchema(); if (meta.empty() || record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { LOG_S_INFO("Bad metadata for writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId); - SendResultAndDie(ctx, NKikimrProto::ERROR); + return SendResultAndDie(ctx, NKikimrProto::ERROR); } } @@ -128,8 +123,7 @@ public: if (!batch) { LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId << " (" << strError << ") at tablet " << TabletId); - SendResultAndDie(ctx, NKikimrProto::ERROR); - return; + return SendResultAndDie(ctx, NKikimrProto::ERROR); } TString data; @@ -142,8 +136,7 @@ public: << srcData.size() << " bytes) and limit, writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId); - SendResultAndDie(ctx, NKikimrProto::ERROR); - return; + return SendResultAndDie(ctx, NKikimrProto::ERROR); } record.SetData(data); // modify for TxWrite @@ -161,8 +154,7 @@ public: if (!outMeta.SerializeToString(&meta)) { LOG_S_ERROR("Canot set metadata for blob, writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId); - SendResultAndDie(ctx, NKikimrProto::ERROR); - return; + return SendResultAndDie(ctx, NKikimrProto::ERROR); } } record.MutableMeta()->SetLogicalMeta(meta); @@ -181,13 +173,13 @@ public: << " at tablet " << TabletId); if (BlobBatch.AllBlobWritesCompleted()) { - SendResultAndDie(ctx, NKikimrProto::OK); + return SendResultAndDie(ctx, NKikimrProto::OK); } } void SendMultiWriteRequest(const TActorContext& ctx) { Y_VERIFY(WriteIndexEv); - Y_VERIFY(WriteIndexEv->PutStatus == NKikimrProto::UNKNOWN); + Y_VERIFY(WriteIndexEv->GetPutStatus() == NKikimrProto::UNKNOWN); auto indexChanges = WriteIndexEv->IndexChanges; LOG_S_DEBUG("Writing " << WriteIndexEv->Blobs.size() << " blobs at " << TabletId); @@ -306,17 +298,12 @@ private: void SendResult(const TActorContext& ctx, NKikimrProto::EReplyStatus status) { SaveResourceUsage(); if (WriteEv) { - LOG_S_DEBUG("Written " << WriteEv->BlobId.ToStringNew() << " Status: " << status); - WriteEv->PutStatus = status; + WriteEv->SetPutStatus(status, std::move(YellowMoveChannels), std::move(YellowStopChannels)); WriteEv->BlobBatch = std::move(BlobBatch); - WriteEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end()); - WriteEv->YellowStopChannels = TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end()); ctx.Send(DstActor, WriteEv.Release()); } else { - WriteIndexEv->PutStatus = status; + WriteIndexEv->SetPutStatus(status, std::move(YellowMoveChannels), std::move(YellowStopChannels)); WriteIndexEv->BlobBatch = std::move(BlobBatch); - WriteIndexEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end()); - WriteIndexEv->YellowStopChannels = TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end()); ctx.Send(DstActor, WriteIndexEv.Release()); } } |