diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-31 11:22:49 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-31 11:22:49 +0300 |
commit | f81ed92a57f366855191aed00ea789816f5a6c69 (patch) | |
tree | 065e171cfc45d5795ff80cf4e4032ffdf7116551 | |
parent | 72dd5424aab89989d74273b6ab089b4d43156eca (diff) | |
download | ydb-f81ed92a57f366855191aed00ea789816f5a6c69.tar.gz |
Add ValueVersion logic
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_range.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 19 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_decommit.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 20 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 2 |
7 files changed, 56 insertions, 4 deletions
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 2e366a0a6c1..495d9f09f45 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -18,10 +18,12 @@ namespace NKikimr::NBlobDepot { switch (ev->GetTypeRewrite()) { case TEvBlobStorage::EvGet: doForward = ev->Get<TEvBlobStorage::TEvGet>()->Decommission; + Y_VERIFY(!doForward || !ev->Get<TEvBlobStorage::TEvGet>()->MustRestoreFirst); break; case TEvBlobStorage::EvRange: doForward = ev->Get<TEvBlobStorage::TEvRange>()->Decommission; + Y_VERIFY(!doForward || !ev->Get<TEvBlobStorage::TEvRange>()->MustRestoreFirst); break; } diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 4c958f3367e..719a09f7574 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -59,7 +59,6 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); item->SetExactKey(Reads[tag].Id.AsBinaryString()); - item->SetTabletId(Request.TabletId); item->SetMustRestoreFirst(Request.MustRestoreFirst); item->SetCookie(tag); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index a19c348c383..52edebf5f02 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -92,6 +92,10 @@ namespace NKikimr::NBlobDepot { const bool wasUncertain = value.IsWrittenUncertainly(); const bool wasGoingToAssimilate = value.GoingToAssimilate; +#ifndef NDEBUG + TValue originalValue(value); +#endif + if (!inserted) { EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { const auto it = RefCount.find(id); @@ -104,6 +108,11 @@ namespace NKikimr::NBlobDepot { EUpdateOutcome outcome = callback(value, inserted); +#ifndef NDEBUG + Y_VERIFY(outcome != EUpdateOutcome::NO_CHANGE || !value.Changed(originalValue)); + Y_VERIFY(value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain)); +#endif + if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) { outcome = EUpdateOutcome::DROP; } @@ -215,6 +224,7 @@ namespace NKikimr::NBlobDepot { auto *chain = value.ValueChain.Add(); auto *locator = chain->MutableLocator(); locator->CopyFrom(item.GetBlobLocator()); + ++value.ValueVersion; // clear assimilation flag -- we have blob overwritten with fresh copy (of the same data) value.GoingToAssimilate = false; @@ -245,6 +255,7 @@ namespace NKikimr::NBlobDepot { locator->SetTotalDataLen(key.GetBlobId().BlobSize()); locator->SetFooterLen(0); value.GoingToAssimilate = false; + ++value.ValueVersion; outcome = EUpdateOutcome::CHANGE; } return outcome; diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index af9b1e3d143..34751756455 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -262,21 +262,24 @@ namespace NKikimr::NBlobDepot { EKeepState KeepState = EKeepState::Default; bool Public = false; bool GoingToAssimilate = false; + ui32 ValueVersion = 0; bool UncertainWrite = false; TValue() = default; - TValue(const TValue&) = delete; TValue(TValue&&) = default; TValue& operator =(const TValue&) = delete; TValue& operator =(TValue&&) = default; + explicit TValue(const TValue&) = default; + explicit TValue(NKikimrBlobDepot::TValue&& proto, bool uncertainWrite) : Meta(proto.GetMeta()) , ValueChain(std::move(*proto.MutableValueChain())) , KeepState(proto.GetKeepState()) , Public(proto.GetPublic()) , GoingToAssimilate(proto.GetGoingToAssimilate()) + , ValueVersion(proto.GetValueVersion()) , UncertainWrite(uncertainWrite) {} @@ -316,6 +319,9 @@ namespace NKikimr::NBlobDepot { if (GoingToAssimilate != proto->GetGoingToAssimilate()) { proto->SetGoingToAssimilate(GoingToAssimilate); } + if (ValueVersion != proto->GetValueVersion()) { + proto->SetValueVersion(ValueVersion); + } } static bool Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item); @@ -342,9 +348,20 @@ namespace NKikimr::NBlobDepot { << " KeepState# " << EKeepState_Name(KeepState) << " Public# " << (Public ? "true" : "false") << " GoingToAssimilate# " << (GoingToAssimilate ? "true" : "false") + << " ValueVersion# " << ValueVersion << " UncertainWrite# " << (UncertainWrite ? "true" : "false") << "}"; } + + bool Changed(const TValue& other) const { + return Meta != other.Meta || + !IsSameValueChain(ValueChain, other.ValueChain) || + KeepState != other.KeepState || + Public != other.Public || + GoingToAssimilate != other.GoingToAssimilate || + ValueVersion != other.ValueVersion || + UncertainWrite != other.UncertainWrite; + } }; enum EScanFlags : ui32 { diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index f7df4860fda..15a45ecb8aa 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -35,7 +35,7 @@ namespace NKikimr::NBlobDepot { void Bootstrap() { if (Token.expired()) { - return; + return PassAway(); } STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()), @@ -49,6 +49,8 @@ namespace NKikimr::NBlobDepot { } else { Handle(Ev); } + + Become(&TThis::StateFunc); } void Handle(TEvBlobDepot::TEvResolve::TPtr ev) { @@ -114,7 +116,6 @@ namespace NKikimr::NBlobDepot { } } - Become(&TThis::StateFunc); CheckIfDone(); } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 72714424d63..ac33792de65 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -170,6 +170,26 @@ namespace NKikimr::NBlobDepot { } } + inline bool IsSameValueChain(const TValueChain& x, const TValueChain& y) { + if (x.size() != y.size()) { + return false; + } + for (int i = 0; i < x.size(); ++i) { + TString a; + bool success = x[i].SerializeToString(&a); + Y_VERIFY(success); + + TString b; + success = y[i].SerializeToString(&b); + Y_VERIFY(success); + + if (a != b) { + return false; + } + } + return true; + } + class TGenStep { ui64 Value = 0; diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 8b0292a03f8..488858ccc19 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -37,6 +37,7 @@ message TValue { optional EKeepState KeepState = 3; optional bool Public = 4; optional bool GoingToAssimilate = 5; + optional uint32 ValueVersion = 6; } message TGivenIdRange { @@ -220,6 +221,7 @@ message TEvResolveResult { optional bytes Meta = 4; repeated uint64 Owners = 5; optional string ErrorReason = 6; // if set, this means value wasn't resolved due to error + optional uint32 ValueVersion = 7; // ValueChain version, gets increased every time value is changed } optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way optional string ErrorReason = 2; |