aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-06-01 17:14:34 +0300
committerchertus <azuikov@ydb.tech>2023-06-01 17:14:34 +0300
commit0fa4c2d7731ee14f0d45501830d5d66377530e26 (patch)
treeb3b06bdfc2d477858530b6230f9eb9ce2e9ac253
parent308587966b506c17c0a15004ee2181bfc4a5cb37 (diff)
downloadydb-0fa4c2d7731ee14f0d45501830d5d66377530e26.tar.gz
better yellow flags check in ColumnShard
-rw-r--r--ydb/core/tx/columnshard/columnshard.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp102
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h9
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp11
-rw-r--r--ydb/core/tx/columnshard/defs.h32
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp11
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp9
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp37
11 files changed, 132 insertions, 103 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 096537cc12c..4ec86e165b4 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -228,7 +228,8 @@ struct TEvColumnShard {
}
};
- struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> {
+ struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite>
+ , public NColumnShard::TPutStatus {
TEvWrite() = default;
TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId,
@@ -262,13 +263,10 @@ struct TEvColumnShard {
return ActorIdFromProto(Record.GetSource());
}
- NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
NColumnShard::TUnifiedBlobId BlobId;
std::shared_ptr<arrow::RecordBatch> WrittenBatch;
NColumnShard::TBlobBatch BlobBatch;
NColumnShard::TUsage ResourceUsage;
- TVector<ui32> YellowMoveChannels;
- TVector<ui32> YellowStopChannels;
ui64 MaxSmallBlobSize;
};
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 33ee2942e37..0dcb1a12e56 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -44,7 +44,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
auto& logoBlobId = Ev->Get()->BlobId;
- auto putStatus = Ev->Get()->PutStatus;
+ auto putStatus = Ev->Get()->GetPutStatus();
Y_VERIFY(putStatus == NKikimrProto::OK);
Y_VERIFY(logoBlobId.IsValid());
@@ -129,7 +129,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
LastAccessTime = TAppData::TimeProvider->Now();
- OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
+ OnYellowChannels(*ev->Get());
auto& record = Proto(ev->Get());
auto& data = record.GetData();
@@ -137,7 +137,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
ui64 metaShard = record.GetTxInitiator();
ui64 writeId = record.GetWriteId();
TString dedupId = record.GetDedupId();
- auto putStatus = ev->Get()->PutStatus;
+ auto putStatus = ev->Get()->GetPutStatus();
bool isWritable = TablesManager.IsWritableTable(tableId);
bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable;
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 427259b209a..82754ad0a72 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -58,7 +58,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
<< ") changes: " << *changes << " at tablet " << Self->TabletID());
bool ok = false;
- if (Ev->Get()->PutStatus == NKikimrProto::OK) {
+ if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
@@ -246,52 +246,18 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
}
- Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes);
-
- if (changes->IsInsert()) {
- Self->ActiveIndexing = false;
-
- Self->IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL);
- Self->IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten);
- Self->IncCounter(COUNTER_INDEXING_TIME, Ev->Get()->Duration.MilliSeconds());
- } else if (changes->IsCompaction()) {
- Self->ActiveCompaction--;
-
- Y_VERIFY(changes->CompactionInfo);
- bool inGranule = changes->CompactionInfo->InGranule();
-
- if (inGranule) {
- Self->IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL);
- Self->IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten);
- } else {
- Self->IncCounter(ok ? COUNTER_SPLIT_COMPACTION_SUCCESS : COUNTER_SPLIT_COMPACTION_FAIL);
- Self->IncCounter(COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, bytesWritten);
- }
- Self->IncCounter(COUNTER_COMPACTION_TIME, Ev->Get()->Duration.MilliSeconds());
- } else if (changes->IsCleanup()) {
- Self->ActiveCleanup = false;
+ if (changes->IsCleanup()) {
TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Cleanup() : TBackgroundActivity::None();
-
Self->BlobManager->GetCleanupBlobs(BlobsToForget);
-
- Self->IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL);
} else if (changes->IsTtl()) {
- Self->ActiveTtl = false;
//TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Ttl() : TBackgroundActivity::None();
// Do not start new TTL till we evict current PortionsToEvict. We could evict them twice otherwise
Y_VERIFY(!Self->ActiveEvictions, "Unexpected active evictions count at tablet %lu", Self->TabletID());
Self->ActiveEvictions = ExportTierBlobs.size();
-
- Self->IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL);
- Self->IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten);
}
- Self->UpdateResourceMetrics(ctx, Ev->Get()->ResourceUsage);
+ Self->FinishWriteIndex(ctx, Ev, ok, blobsWritten, bytesWritten);
return true;
}
@@ -299,7 +265,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Y_VERIFY(Ev);
LOG_S_DEBUG("TTxWriteIndex.Complete at tablet " << Self->TabletID());
- if (Ev->Get()->PutStatus == NKikimrProto::TRYLATER) {
+ if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) {
ctx.Schedule(Self->FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
Self->EnqueueBackgroundActivities(false, TriggerActivity);
@@ -317,18 +283,64 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Self->ForgetBlobs(ctx, BlobsToForget);
}
+void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWriteIndex::TPtr& ev,
+ bool ok, ui64 blobsWritten, ui64 bytesWritten) {
+ auto changes = ev->Get()->IndexChanges;
+ Y_VERIFY(changes);
+
+ TablesManager.MutablePrimaryIndex().FreeLocks(changes);
+
+ if (changes->IsInsert()) {
+ ActiveIndexing = false;
+
+ IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL);
+ IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten);
+ IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten);
+ IncCounter(COUNTER_INDEXING_TIME, ev->Get()->Duration.MilliSeconds());
+ } else if (changes->IsCompaction()) {
+ ActiveCompaction--;
+
+ Y_VERIFY(changes->CompactionInfo);
+ bool inGranule = changes->CompactionInfo->InGranule();
+
+ if (inGranule) {
+ IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL);
+ IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten);
+ IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten);
+ } else {
+ IncCounter(ok ? COUNTER_SPLIT_COMPACTION_SUCCESS : COUNTER_SPLIT_COMPACTION_FAIL);
+ IncCounter(COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, blobsWritten);
+ IncCounter(COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, bytesWritten);
+ }
+ IncCounter(COUNTER_COMPACTION_TIME, ev->Get()->Duration.MilliSeconds());
+ } else if (changes->IsCleanup()) {
+ ActiveCleanup = false;
+
+ IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL);
+ } else if (changes->IsTtl()) {
+ ActiveTtl = false;
+
+ IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL);
+ IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten);
+ IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten);
+ }
+
+ UpdateResourceMetrics(ctx, ev->Get()->ResourceUsage);
+}
void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
- auto& blobs = ev->Get()->Blobs;
+ auto putStatus = ev->Get()->GetPutStatus();
- if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) {
+ if (putStatus == NKikimrProto::UNKNOWN) {
if (IsAnyChannelYellowStop()) {
LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID());
IncCounter(COUNTER_OUT_OF_SPACE);
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWriteIndex(this, ev), ctx);
+ ev->Get()->SetPutStatus(NKikimrProto::TRYLATER);
+ FinishWriteIndex(ctx, ev);
+ ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
+ auto& blobs = ev->Get()->Blobs;
LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID());
Y_VERIFY(!blobs.empty());
@@ -336,13 +348,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
} else {
- if (ev->Get()->PutStatus == NKikimrProto::OK) {
- LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID());
+ if (putStatus == NKikimrProto::OK) {
+ LOG_S_DEBUG("WriteIndex at tablet " << TabletID());
} else {
LOG_S_INFO("WriteIndex error at tablet " << TabletID());
}
- OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
+ OnYellowChannels(*ev->Get());
Execute(new TTxWriteIndex(this, ev), ctx);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 0cdc3955fbe..48677da1f90 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -899,7 +899,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
#endif
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
- ev->PutStatus = NKikimrProto::OK; // No new blobs to write
+ ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
ActiveCleanup = true;
return ev;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 0f91676649c..e83d109f13d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -195,10 +195,8 @@ class TColumnShard
return Executor()->GetStats().IsAnyChannelYellowMove;
}
- void OnYellowChannels(TVector<ui32>&& yellowMove, TVector<ui32>&& yellowStop) {
- if (yellowMove.size() || yellowStop.size()) {
- Executor()->OnYellowChannels(std::move(yellowMove), std::move(yellowStop));
- }
+ void OnYellowChannels(TPutStatus& putStatus) {
+ putStatus.OnYellowChannels(Executor());
}
void SetCounter(NColumnShard::ESimpleCounters counter, ui64 num) const {
@@ -384,7 +382,7 @@ private:
const TScanCounters ScanCounters;
const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation");
const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
-
+
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
@@ -458,6 +456,8 @@ private:
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
+ void FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWriteIndex::TPtr& ev,
+ bool ok = false, ui64 blobsWritten = 0, ui64 bytesWritten = 0);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
TActorId GetS3ActorForTier(const TString& tierId) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId,
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 05a23aca572..eb653f00d81 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -25,8 +25,7 @@ struct TEvPrivate {
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
- struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
- NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
+ struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex>, public TPutStatus {
NOlap::TVersionedIndex IndexInfo;
THashMap<ui64, NKikimr::NOlap::TTiering> Tiering;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
@@ -35,8 +34,6 @@ struct TEvPrivate {
bool GranuleCompaction{false};
TBlobBatch BlobBatch;
TUsage ResourceUsage;
- TVector<ui32> YellowMoveChannels;
- TVector<ui32> YellowStopChannels;
bool CacheData{false};
TDuration Duration;
@@ -112,12 +109,12 @@ struct TEvPrivate {
}
}
} else {
- TxEvent->PutStatus = NKikimrProto::OK;
+ TxEvent->SetPutStatus(NKikimrProto::OK);
}
}
bool NeedDataReadWrite() const {
- return (TxEvent->PutStatus != NKikimrProto::OK);
+ return (TxEvent->GetPutStatus() != NKikimrProto::OK);
}
};
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 65d6f87c876..3915f92e327 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -76,9 +76,10 @@ public:
LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob "
<< blobId.ToString() << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet "
<< TabletId << " (compaction)");
- TxEvent->PutStatus = event.Status;
- if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
- TxEvent->PutStatus = NKikimrProto::ERROR;
+ if (event.Status == NKikimrProto::UNKNOWN) {
+ TxEvent->SetPutStatus(NKikimrProto::ERROR);
+ } else {
+ TxEvent->SetPutStatus(event.Status);
}
}
@@ -135,7 +136,7 @@ private:
void CompactGranules(const TActorContext& ctx) {
Y_VERIFY(TxEvent);
- if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) {
+ if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) {
LOG_S_INFO("Granules compaction not started at tablet " << TabletId);
ctx.Send(Parent, TxEvent.release());
return;
@@ -151,7 +152,7 @@ private:
TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult());
}
if (TxEvent->Blobs.empty()) {
- TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit
+ TxEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit
}
}
TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index 30a248242b1..7717ea64a70 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -140,4 +140,36 @@ public:
}
};
+class TPutStatus {
+public:
+ NKikimrProto::EReplyStatus GetPutStatus() const {
+ return PutStatus;
+ }
+
+ void SetPutStatus(NKikimrProto::EReplyStatus status) {
+ PutStatus = status;
+ }
+
+ void SetPutStatus(NKikimrProto::EReplyStatus status,
+ THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) {
+ PutStatus = status;
+ YellowMoveChannels = std::move(yellowMoveChannels);
+ YellowStopChannels = std::move(yellowStopChannels);
+ }
+
+ template <typename T>
+ void OnYellowChannels(T* executor) {
+ if (YellowMoveChannels.size() || YellowStopChannels.size()) {
+ executor->OnYellowChannels(
+ TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end()),
+ TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end()));
+ }
+ }
+
+private:
+ NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
+ THashSet<ui32> YellowMoveChannels;
+ THashSet<ui32> YellowStopChannels;
+};
+
}
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index 865fda62be5..5b3f1a5da69 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -67,9 +67,10 @@ public:
LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString()
<< " status " << NKikimrProto::EReplyStatus_Name(event.Status)
<< " at tablet " << TabletId << " (eviction)");
- TxEvent->PutStatus = event.Status;
- if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
- TxEvent->PutStatus = NKikimrProto::ERROR;
+ if (event.Status == NKikimrProto::UNKNOWN) {
+ TxEvent->SetPutStatus(NKikimrProto::ERROR);
+ } else {
+ TxEvent->SetPutStatus(event.Status);
}
}
@@ -121,7 +122,7 @@ private:
void EvictPortions(const TActorContext& ctx) {
Y_VERIFY(TxEvent);
- if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) {
+ if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) {
LOG_S_INFO("Portions eviction not started at tablet " << TabletId);
ctx.Send(Parent, TxEvent.release());
return;
@@ -135,7 +136,7 @@ private:
NOlap::TEvictionLogic evictionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
TxEvent->Blobs = std::move(evictionLogic.Apply(TxEvent->IndexChanges).DetachResult());
if (TxEvent->Blobs.empty()) {
- TxEvent->PutStatus = NKikimrProto::OK;
+ TxEvent->SetPutStatus(NKikimrProto::OK);
}
}
ui32 blobsSize = TxEvent->Blobs.size();
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 1527c231103..5d48679f002 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -62,9 +62,10 @@ public:
<< " at tablet " << TabletId << " (index)");
BlobsToRead.erase(blobId);
- TxEvent->PutStatus = event.Status;
- if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
- TxEvent->PutStatus = NKikimrProto::ERROR;
+ if (event.Status == NKikimrProto::UNKNOWN) {
+ TxEvent->SetPutStatus(NKikimrProto::ERROR);
+ } else {
+ TxEvent->SetPutStatus(event.Status);
}
return;
}
@@ -125,7 +126,7 @@ private:
void Index(const TActorContext& ctx) {
Y_VERIFY(TxEvent);
- if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
+ if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) {
LOG_S_DEBUG("Indexing started at tablet " << TabletId);
TCpuGuard guard(TxEvent->ResourceUsage);
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index df5f91931ed..7ef9b70e4ba 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -33,8 +33,6 @@ public:
Y_VERIFY(!WriteEv || !WriteIndexEv);
}
- // TODO: CheckYellow
-
void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) {
TEvBlobStorage::TEvPutResult* msg = ev->Get();
auto status = msg->Status;
@@ -46,12 +44,10 @@ public:
YellowStopChannels.insert(msg->Id.Channel());
}
-
if (status != NKikimrProto::OK) {
LOG_S_ERROR("Unsuccessful TEvPutResult for blob " << msg->Id.ToString()
<< " status: " << status << " reason: " << msg->ErrorReason);
- SendResultAndDie(ctx, status);
- return;
+ return SendResultAndDie(ctx, status);
}
LOG_S_TRACE("TEvPutResult for blob " << msg->Id.ToString());
@@ -59,7 +55,7 @@ public:
BlobBatch.OnBlobWriteResult(ev);
if (BlobBatch.AllBlobWritesCompleted()) {
- SendResultAndDie(ctx, NKikimrProto::OK);
+ return SendResultAndDie(ctx, NKikimrProto::OK);
}
}
@@ -86,8 +82,7 @@ public:
if (Deadline != TInstant::Max()) {
TInstant now = TAppData::TimeProvider->Now();
if (Deadline <= now) {
- SendResultAndDie(ctx, NKikimrProto::TIMEOUT);
- return;
+ return SendResultAndDie(ctx, NKikimrProto::TIMEOUT);
}
const TDuration timeout = Deadline - now;
@@ -103,7 +98,7 @@ public:
}
void SendWriteRequest(const TActorContext& ctx) {
- Y_VERIFY(WriteEv->PutStatus == NKikimrProto::UNKNOWN);
+ Y_VERIFY(WriteEv->GetPutStatus() == NKikimrProto::UNKNOWN);
auto& record = Proto(WriteEv.Get());
ui64 pathId = record.GetTableId();
@@ -114,7 +109,7 @@ public:
meta = record.GetMeta().GetSchema();
if (meta.empty() || record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
LOG_S_INFO("Bad metadata for writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId);
- SendResultAndDie(ctx, NKikimrProto::ERROR);
+ return SendResultAndDie(ctx, NKikimrProto::ERROR);
}
}
@@ -128,8 +123,7 @@ public:
if (!batch) {
LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId
<< " (" << strError << ") at tablet " << TabletId);
- SendResultAndDie(ctx, NKikimrProto::ERROR);
- return;
+ return SendResultAndDie(ctx, NKikimrProto::ERROR);
}
TString data;
@@ -142,8 +136,7 @@ public:
<< srcData.size() << " bytes) and limit, writeId " << writeId << " pathId " << pathId
<< " at tablet " << TabletId);
- SendResultAndDie(ctx, NKikimrProto::ERROR);
- return;
+ return SendResultAndDie(ctx, NKikimrProto::ERROR);
}
record.SetData(data); // modify for TxWrite
@@ -161,8 +154,7 @@ public:
if (!outMeta.SerializeToString(&meta)) {
LOG_S_ERROR("Canot set metadata for blob, writeId " << writeId << " pathId " << pathId
<< " at tablet " << TabletId);
- SendResultAndDie(ctx, NKikimrProto::ERROR);
- return;
+ return SendResultAndDie(ctx, NKikimrProto::ERROR);
}
}
record.MutableMeta()->SetLogicalMeta(meta);
@@ -181,13 +173,13 @@ public:
<< " at tablet " << TabletId);
if (BlobBatch.AllBlobWritesCompleted()) {
- SendResultAndDie(ctx, NKikimrProto::OK);
+ return SendResultAndDie(ctx, NKikimrProto::OK);
}
}
void SendMultiWriteRequest(const TActorContext& ctx) {
Y_VERIFY(WriteIndexEv);
- Y_VERIFY(WriteIndexEv->PutStatus == NKikimrProto::UNKNOWN);
+ Y_VERIFY(WriteIndexEv->GetPutStatus() == NKikimrProto::UNKNOWN);
auto indexChanges = WriteIndexEv->IndexChanges;
LOG_S_DEBUG("Writing " << WriteIndexEv->Blobs.size() << " blobs at " << TabletId);
@@ -306,17 +298,12 @@ private:
void SendResult(const TActorContext& ctx, NKikimrProto::EReplyStatus status) {
SaveResourceUsage();
if (WriteEv) {
- LOG_S_DEBUG("Written " << WriteEv->BlobId.ToStringNew() << " Status: " << status);
- WriteEv->PutStatus = status;
+ WriteEv->SetPutStatus(status, std::move(YellowMoveChannels), std::move(YellowStopChannels));
WriteEv->BlobBatch = std::move(BlobBatch);
- WriteEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end());
- WriteEv->YellowStopChannels = TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end());
ctx.Send(DstActor, WriteEv.Release());
} else {
- WriteIndexEv->PutStatus = status;
+ WriteIndexEv->SetPutStatus(status, std::move(YellowMoveChannels), std::move(YellowStopChannels));
WriteIndexEv->BlobBatch = std::move(BlobBatch);
- WriteIndexEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end());
- WriteIndexEv->YellowStopChannels = TVector<ui32>(YellowStopChannels.begin(), YellowStopChannels.end());
ctx.Send(DstActor, WriteIndexEv.Release());
}
}