aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-31 11:22:49 +0300
committeralexvru <alexvru@ydb.tech>2023-01-31 11:22:49 +0300
commitf81ed92a57f366855191aed00ea789816f5a6c69 (patch)
tree065e171cfc45d5795ff80cf4e4032ffdf7116551
parent72dd5424aab89989d74273b6ab089b4d43156eca (diff)
downloadydb-f81ed92a57f366855191aed00ea789816f5a6c69.tar.gz
Add ValueVersion logic
-rw-r--r--ydb/core/blob_depot/agent/query.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp1
-rw-r--r--ydb/core/blob_depot/data.cpp11
-rw-r--r--ydb/core/blob_depot/data.h19
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp5
-rw-r--r--ydb/core/blob_depot/types.h20
-rw-r--r--ydb/core/protos/blob_depot.proto2
7 files changed, 56 insertions, 4 deletions
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 2e366a0a6c1..495d9f09f45 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -18,10 +18,12 @@ namespace NKikimr::NBlobDepot {
switch (ev->GetTypeRewrite()) {
case TEvBlobStorage::EvGet:
doForward = ev->Get<TEvBlobStorage::TEvGet>()->Decommission;
+ Y_VERIFY(!doForward || !ev->Get<TEvBlobStorage::TEvGet>()->MustRestoreFirst);
break;
case TEvBlobStorage::EvRange:
doForward = ev->Get<TEvBlobStorage::TEvRange>()->Decommission;
+ Y_VERIFY(!doForward || !ev->Get<TEvBlobStorage::TEvRange>()->MustRestoreFirst);
break;
}
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 4c958f3367e..719a09f7574 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -59,7 +59,6 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvResolve resolve;
auto *item = resolve.AddItems();
item->SetExactKey(Reads[tag].Id.AsBinaryString());
- item->SetTabletId(Request.TabletId);
item->SetMustRestoreFirst(Request.MustRestoreFirst);
item->SetCookie(tag);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index a19c348c383..52edebf5f02 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -92,6 +92,10 @@ namespace NKikimr::NBlobDepot {
const bool wasUncertain = value.IsWrittenUncertainly();
const bool wasGoingToAssimilate = value.GoingToAssimilate;
+#ifndef NDEBUG
+ TValue originalValue(value);
+#endif
+
if (!inserted) {
EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
const auto it = RefCount.find(id);
@@ -104,6 +108,11 @@ namespace NKikimr::NBlobDepot {
EUpdateOutcome outcome = callback(value, inserted);
+#ifndef NDEBUG
+ Y_VERIFY(outcome != EUpdateOutcome::NO_CHANGE || !value.Changed(originalValue));
+ Y_VERIFY(value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain));
+#endif
+
if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) {
outcome = EUpdateOutcome::DROP;
}
@@ -215,6 +224,7 @@ namespace NKikimr::NBlobDepot {
auto *chain = value.ValueChain.Add();
auto *locator = chain->MutableLocator();
locator->CopyFrom(item.GetBlobLocator());
+ ++value.ValueVersion;
// clear assimilation flag -- we have blob overwritten with fresh copy (of the same data)
value.GoingToAssimilate = false;
@@ -245,6 +255,7 @@ namespace NKikimr::NBlobDepot {
locator->SetTotalDataLen(key.GetBlobId().BlobSize());
locator->SetFooterLen(0);
value.GoingToAssimilate = false;
+ ++value.ValueVersion;
outcome = EUpdateOutcome::CHANGE;
}
return outcome;
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index af9b1e3d143..34751756455 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -262,21 +262,24 @@ namespace NKikimr::NBlobDepot {
EKeepState KeepState = EKeepState::Default;
bool Public = false;
bool GoingToAssimilate = false;
+ ui32 ValueVersion = 0;
bool UncertainWrite = false;
TValue() = default;
- TValue(const TValue&) = delete;
TValue(TValue&&) = default;
TValue& operator =(const TValue&) = delete;
TValue& operator =(TValue&&) = default;
+ explicit TValue(const TValue&) = default;
+
explicit TValue(NKikimrBlobDepot::TValue&& proto, bool uncertainWrite)
: Meta(proto.GetMeta())
, ValueChain(std::move(*proto.MutableValueChain()))
, KeepState(proto.GetKeepState())
, Public(proto.GetPublic())
, GoingToAssimilate(proto.GetGoingToAssimilate())
+ , ValueVersion(proto.GetValueVersion())
, UncertainWrite(uncertainWrite)
{}
@@ -316,6 +319,9 @@ namespace NKikimr::NBlobDepot {
if (GoingToAssimilate != proto->GetGoingToAssimilate()) {
proto->SetGoingToAssimilate(GoingToAssimilate);
}
+ if (ValueVersion != proto->GetValueVersion()) {
+ proto->SetValueVersion(ValueVersion);
+ }
}
static bool Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item);
@@ -342,9 +348,20 @@ namespace NKikimr::NBlobDepot {
<< " KeepState# " << EKeepState_Name(KeepState)
<< " Public# " << (Public ? "true" : "false")
<< " GoingToAssimilate# " << (GoingToAssimilate ? "true" : "false")
+ << " ValueVersion# " << ValueVersion
<< " UncertainWrite# " << (UncertainWrite ? "true" : "false")
<< "}";
}
+
+ bool Changed(const TValue& other) const {
+ return Meta != other.Meta ||
+ !IsSameValueChain(ValueChain, other.ValueChain) ||
+ KeepState != other.KeepState ||
+ Public != other.Public ||
+ GoingToAssimilate != other.GoingToAssimilate ||
+ ValueVersion != other.ValueVersion ||
+ UncertainWrite != other.UncertainWrite;
+ }
};
enum EScanFlags : ui32 {
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index f7df4860fda..15a45ecb8aa 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -35,7 +35,7 @@ namespace NKikimr::NBlobDepot {
void Bootstrap() {
if (Token.expired()) {
- return;
+ return PassAway();
}
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()),
@@ -49,6 +49,8 @@ namespace NKikimr::NBlobDepot {
} else {
Handle(Ev);
}
+
+ Become(&TThis::StateFunc);
}
void Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
@@ -114,7 +116,6 @@ namespace NKikimr::NBlobDepot {
}
}
- Become(&TThis::StateFunc);
CheckIfDone();
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 72714424d63..ac33792de65 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -170,6 +170,26 @@ namespace NKikimr::NBlobDepot {
}
}
+ inline bool IsSameValueChain(const TValueChain& x, const TValueChain& y) {
+ if (x.size() != y.size()) {
+ return false;
+ }
+ for (int i = 0; i < x.size(); ++i) {
+ TString a;
+ bool success = x[i].SerializeToString(&a);
+ Y_VERIFY(success);
+
+ TString b;
+ success = y[i].SerializeToString(&b);
+ Y_VERIFY(success);
+
+ if (a != b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
class TGenStep {
ui64 Value = 0;
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 8b0292a03f8..488858ccc19 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -37,6 +37,7 @@ message TValue {
optional EKeepState KeepState = 3;
optional bool Public = 4;
optional bool GoingToAssimilate = 5;
+ optional uint32 ValueVersion = 6;
}
message TGivenIdRange {
@@ -220,6 +221,7 @@ message TEvResolveResult {
optional bytes Meta = 4;
repeated uint64 Owners = 5;
optional string ErrorReason = 6; // if set, this means value wasn't resolved due to error
+ optional uint32 ValueVersion = 7; // ValueChain version, gets increased every time value is changed
}
optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way
optional string ErrorReason = 2;