diff options
author | Nikolay Shumkov <shumkovnd@ydb.tech> | 2025-04-15 17:17:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-15 14:17:33 +0000 |
commit | ea212ccacb42f706accf08bfcfcf6af4793408c3 (patch) | |
tree | 86b3b4744d2278548fc6aa0db440979bb60054fd | |
parent | 03e4e829e111195898215179219e7eb7e5ac0ca0 (diff) | |
download | ydb-ea212ccacb42f706accf08bfcfcf6af4793408c3.tar.gz |
Support GetSequence in SequenceProxy (#17075)
-rw-r--r-- | ydb/core/tx/sequenceproxy/public/events.h | 56 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp | 133 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp | 107 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.h | 34 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp | 90 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/ya.make | 1 |
6 files changed, 407 insertions, 14 deletions
diff --git a/ydb/core/tx/sequenceproxy/public/events.h b/ydb/core/tx/sequenceproxy/public/events.h index a046f1b019c..aa63f48c8c0 100644 --- a/ydb/core/tx/sequenceproxy/public/events.h +++ b/ydb/core/tx/sequenceproxy/public/events.h @@ -21,6 +21,8 @@ namespace NSequenceProxy { EvNextValResult, EvSetVal, EvSetValResult, + EvGetSequence, + EvGetSequenceResult, EvEnd, }; @@ -70,6 +72,60 @@ namespace NSequenceProxy { , Value(value) { } }; + + struct TEvGetSequence : public TEventLocal<TEvGetSequence, EvGetSequence> { + TString Database; + TPathId PathId; + TIntrusivePtr<NACLib::TUserToken> UserToken; + + explicit TEvGetSequence(const TPathId& pathId) + : PathId(pathId) + { } + + TEvGetSequence(const TString& database, const TPathId& pathId) + : Database(database) + , PathId(pathId) + { } + }; + + struct TEvGetSequenceResult : public TEventLocal<TEvGetSequenceResult, EvGetSequenceResult> { + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TPathId PathId; + + i64 MinValue; + i64 MaxValue; + i64 StartValue; + i64 NextValue; + bool NextUsed; + ui64 Cache; + i64 Increment; + bool Cycle; + + TEvGetSequenceResult(const TPathId& pathId) + : Status(Ydb::StatusIds::SUCCESS) + , PathId(pathId) + { } + + TEvGetSequenceResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) + : Status(status) + , Issues(issues) + { } + + explicit TEvGetSequenceResult(const TEvGetSequenceResult& ev) + : Status(ev.Status) + , Issues(ev.Issues) + , PathId(ev.PathId) + , MinValue(ev.MinValue) + , MaxValue(ev.MaxValue) + , StartValue(ev.StartValue) + , NextValue(ev.NextValue) + , NextUsed(ev.NextUsed) + , Cache(ev.Cache) + , Increment(ev.Increment) + , Cycle(ev.Cycle) + { } + }; }; } // namespace NSequenceProxy diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp new file mode 100644 index 00000000000..0c268651ac8 --- /dev/null +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp @@ -0,0 +1,133 @@ +#include "sequenceproxy_impl.h" + +#include <ydb/library/ydb_issue/issue_helpers.h> +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/tx/sequenceshard/public/events.h> +#include <yql/essentials/public/issue/yql_issue_manager.h> + +#include <ydb/library/actors/core/log.h> +#include <util/string/builder.h> + +#define TXLOG_LOG(priority, stream) \ + LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream) +#define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream) +#define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream) +#define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream) + +namespace NKikimr { +namespace NSequenceProxy { + + using namespace NKikimr::NSequenceShard; + + class TSequenceProxy::TGetSequenceActor : public TActorBootstrapped<TGetSequenceActor> { + public: + TGetSequenceActor(const TActorId& owner, ui64 cookie, ui64 tabletId, const TPathId& pathId) + : Owner(owner) + , Cookie(cookie) + , PipeCache(MakePipePerNodeCacheID(true)) + , TabletId(tabletId) + , PathId(pathId) + { } + + void Bootstrap() { + Send(PipeCache, + new TEvPipeCache::TEvForward( + new TEvSequenceShard::TEvGetSequence(PathId), + TabletId, + true), + IEventHandle::FlagTrackDelivery); + Become(&TThis::StateGetSequence); + } + + void PassAway() override { + Send(PipeCache, new TEvPipeCache::TEvUnlink(0)); + TActorBootstrapped::PassAway(); + } + + void ReplyAndDie(Ydb::StatusIds::StatusCode status, NKikimrIssues::TIssuesIds::EIssueCode code, const TString& message) { + NYql::TIssueManager issueManager; + issueManager.RaiseIssue(MakeIssue(code, message)); + auto res = MakeHolder<TEvSequenceProxy::TEvGetSequenceResult>(status, issueManager.GetIssues()); + Send(Owner, res.Release(), 0, Cookie); + PassAway(); + } + + private: + STFUNC(StateGetSequence) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSequenceShard::TEvGetSequenceResult, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } + + void Handle(TEvSequenceShard::TEvGetSequenceResult::TPtr& ev) { + auto* msg = ev->Get(); + switch (msg->Record.GetStatus()) { + case NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS: + break; + + case NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_NOT_FOUND: + return ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR, + NKikimrIssues::TIssuesIds::PATH_NOT_EXIST, + "The specified sequence no longer exists"); + + case NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_MOVED: + // Switch to a different tablet and retry transparently + TabletId = msg->Record.GetMovedTo(); + return Bootstrap(); + + default: + return ReplyAndDie(Ydb::StatusIds::INTERNAL_ERROR, + NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR, + TStringBuilder() + << "Unexpected error from sequence shard " << TabletId); + } + + auto res = MakeHolder<TEvSequenceProxy::TEvGetSequenceResult>(PathId); + res->MinValue = msg->Record.GetMinValue(); + res->MaxValue = msg->Record.GetMaxValue(); + res->StartValue = msg->Record.GetStartValue(); + res->NextValue = msg->Record.GetNextValue(); + res->NextUsed = msg->Record.GetNextUsed(); + res->Cache = msg->Record.GetCache(); + res->Increment = msg->Record.GetIncrement(); + res->Cycle = msg->Record.GetCycle(); + Send(Owner, res.Release(), 0, Cookie); + PassAway(); + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + ui64 tabletId = ev->Get()->TabletId; + if (tabletId == TabletId) { + const TString error = TStringBuilder() + << "sequence shard " << tabletId << " is unavailable"; + return ReplyAndDie(Ydb::StatusIds::UNAVAILABLE, NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, error); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr&) { + const TString error = "Pipe cache may not be configured correctly"; + return ReplyAndDie(Ydb::StatusIds::UNAVAILABLE, NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR, error); + } + + private: + const TActorId Owner; + const ui64 Cookie; + const TActorId PipeCache; + ui64 TabletId; + TPathId PathId; + NYql::TIssueManager IssueManager; + }; + + ui64 TSequenceProxy::StartGetSequence(ui64 tabletId, const TString& database, const TPathId& pathId) { + ui64 cookie = ++LastCookie; + auto& info = GetSequenceInFlight[cookie]; + info.Database = database; + info.PathId = pathId; + Register(new TGetSequenceActor(SelfId(), cookie, tabletId, pathId)); + return cookie; + } + +} // namespace NSequenceProxy +} // namespace NKikimr diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp index 6bc5a0997e2..233046dd247 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp @@ -53,6 +53,16 @@ namespace NSequenceProxy { msg->Path); } + void TSequenceProxy::Handle(TEvSequenceProxy::TEvGetSequence::TPtr& ev) { + auto* msg = ev->Get(); + TGetSequenceRequestInfo request; + request.Sender = ev->Sender; + request.Cookie = ev->Cookie; + request.UserToken = std::move(msg->UserToken); + request.StartAt = AppData()->MonotonicTimeProvider->Now(); + DoGetSequence(std::move(request), msg->Database, msg->PathId); + } + void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { Counters->ResponseCount->Inc(); auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds(); @@ -68,6 +78,17 @@ namespace NSequenceProxy { Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie); } + void TSequenceProxy::Reply(const TGetSequenceRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { + Counters->ResponseCount->Inc(); + Counters->ErrorsCount->Inc(); + Send(request.Sender, new TEvSequenceProxy::TEvGetSequenceResult(status, issues), 0, request.Cookie); + } + + void TSequenceProxy::Reply(const TGetSequenceRequestInfo& request, const TEvSequenceProxy::TEvGetSequenceResult& ev) { + Counters->ResponseCount->Inc(); + Send(request.Sender, new TEvSequenceProxy::TEvGetSequenceResult(ev), 0, request.Cookie); + } + void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) { if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) { info.PendingNextValResolve = std::move(info.NewNextValResolve); @@ -111,6 +132,28 @@ namespace NSequenceProxy { OnChanged(database, pathId, info); } + void TSequenceProxy::DoGetSequence(TGetSequenceRequestInfo&& request, const TString& database, const TPathId& pathId) { + Counters->RequestCount->Inc(); + + auto& info = Databases[database].SequenceByPathId[pathId]; + if (!info.ResolveInProgress) { + StartResolve(database, pathId, !info.SequenceInfo); + info.ResolveInProgress = true; + } + if (!info.SequenceInfo) { + info.PendingGetSequenceResolve.emplace_back(std::move(request)); + return; + } + + if (DoMaybeReplyUnauthorized(request, pathId, info)) { + return; + } + + info.PendingGetSequence.emplace_back(std::move(request)); + + OnChanged(database, pathId, info); + } + void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { auto& info = Databases[database].SequenceByName[path]; Y_ABORT_UNLESS(info.ResolveInProgress); @@ -139,7 +182,9 @@ namespace NSequenceProxy { auto& infoById = Databases[database].SequenceByPathId[pathId]; infoById.SequenceInfo = result.SequenceInfo; infoById.SecurityObject = result.SecurityObject; - OnResolved(database, pathId, infoById, info.PendingNextValResolve); + + TList<TGetSequenceRequestInfo> resolvedGetSequence; + OnResolved(database, pathId, infoById, info.PendingNextValResolve, resolvedGetSequence); MaybeStartResolve(database, path, info); } @@ -153,6 +198,11 @@ namespace NSequenceProxy { Reply(info.PendingNextValResolve.front(), status, issues); info.PendingNextValResolve.pop_front(); } + + while (!info.PendingGetSequenceResolve.empty()) { + Reply(info.PendingGetSequenceResolve.front(), status, issues); + info.PendingGetSequenceResolve.pop_front(); + } } void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result) { @@ -163,20 +213,29 @@ namespace NSequenceProxy { Y_ABORT_UNLESS(result.SequenceInfo); info.SequenceInfo = result.SequenceInfo; info.SecurityObject = result.SecurityObject; - OnResolved(database, pathId, info, info.PendingNextValResolve); + OnResolved(database, pathId, info, info.PendingNextValResolve, info.PendingGetSequenceResolve); } - void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved) { + void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, + TList<TNextValRequestInfo>& resolvedNextVal, TList<TGetSequenceRequestInfo>& resolvedGetSequence) { info.LastKnownTabletId = info.SequenceInfo->Description.GetSequenceShard(); info.DefaultCacheSize = Max(info.SequenceInfo->Description.GetCache(), ui64(1)); - while (!resolved.empty()) { - auto& request = resolved.front(); + while (!resolvedNextVal.empty()) { + auto& request = resolvedNextVal.front(); if (!DoMaybeReplyUnauthorized(request, pathId, info)) { info.PendingNextVal.emplace_back(std::move(request)); ++info.TotalRequested; } - resolved.pop_front(); + resolvedNextVal.pop_front(); + } + + while (!resolvedGetSequence.empty()) { + auto& request = resolvedGetSequence.front(); + if (!DoMaybeReplyUnauthorized(request, pathId, info)) { + info.PendingGetSequence.emplace_back(std::move(request)); + } + resolvedGetSequence.pop_front(); } OnChanged(database, pathId, info); @@ -215,6 +274,34 @@ namespace NSequenceProxy { OnChanged(database, pathId, info); } + void TSequenceProxy::Handle(TEvSequenceProxy::TEvGetSequenceResult::TPtr& ev) { + auto it = GetSequenceInFlight.find(ev->Cookie); + Y_ABORT_UNLESS(it != GetSequenceInFlight.end()); + auto database = it->second.Database; + auto pathId = it->second.PathId; + GetSequenceInFlight.erase(it); + + auto& info = Databases[database].SequenceByPathId[pathId]; + Y_ABORT_UNLESS(info.GetSequenceInProgress); + info.GetSequenceInProgress = false; + + auto* msg = ev->Get(); + + if (msg->Status == Ydb::StatusIds::SUCCESS) { + while (!info.PendingGetSequence.empty()) { + Reply(info.PendingGetSequence.front(), *msg); + info.PendingGetSequence.pop_front(); + } + } else { + while (!info.PendingGetSequence.empty()) { + Reply(info.PendingGetSequence.front(), msg->Status, msg->Issues); + info.PendingGetSequence.pop_front(); + } + } + + OnChanged(database, pathId, info); + } + void TSequenceProxy::OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info) { while (info.TotalCached > 0 && !info.PendingNextVal.empty()) { const auto& request = info.PendingNextVal.front(); @@ -230,9 +317,15 @@ namespace NSequenceProxy { info.AllocateInProgress = true; info.TotalAllocating += cache; } + + if (!info.PendingGetSequence.empty() && !info.GetSequenceInProgress) { + StartGetSequence(info.LastKnownTabletId, database, pathId); + info.GetSequenceInProgress = true; + } } - bool TSequenceProxy::DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info) { + template <class TRequestInfo> + bool TSequenceProxy::DoMaybeReplyUnauthorized(const TRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info) { if (request.UserToken && info.SecurityObject) { ui32 access = NACLib::EAccessRights::SelectRow; if (!info.SecurityObject->CheckAccess(access, *request.UserToken)) { diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h index 6868a82d2b2..f9f7d3074c0 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h @@ -42,13 +42,14 @@ namespace NSequenceProxy { private: class TAllocateActor; + class TGetSequenceActor; using TSequenceInfo = NSchemeCache::TSchemeCacheNavigate::TSequenceInfo; struct TEvPrivate { enum EEv { EvBegin = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), - EvAllocateResult, + EvAllocateResult }; struct TEvAllocateResult : public TEventLocal<TEvAllocateResult, EvAllocateResult> { @@ -76,6 +77,13 @@ namespace NSequenceProxy { TMonotonic StartAt; }; + struct TGetSequenceRequestInfo { + TActorId Sender; + ui64 Cookie; + TIntrusivePtr<NACLib::TUserToken> UserToken; + TMonotonic StartAt; + }; + struct TCachedAllocation { i64 Start, Increment; ui64 Count; @@ -95,10 +103,13 @@ namespace NSequenceProxy { TList<TCachedAllocation> CachedAllocations; TList<TNextValRequestInfo> PendingNextValResolve; TList<TNextValRequestInfo> PendingNextVal; + TList<TGetSequenceRequestInfo> PendingGetSequenceResolve; + TList<TGetSequenceRequestInfo> PendingGetSequence; ui64 LastKnownTabletId = 0; ui64 DefaultCacheSize = 0; bool ResolveInProgress = false; bool AllocateInProgress = false; + bool GetSequenceInProgress = false; ui64 TotalCached = 0; ui64 TotalRequested = 0; ui64 TotalAllocating = 0; @@ -128,11 +139,18 @@ namespace NSequenceProxy { TPathId PathId; }; + struct TGetSequenceInFlight { + TString Database; + TPathId PathId; + }; + private: STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { sFunc(TEvents::TEvPoison, HandlePoison); hFunc(TEvSequenceProxy::TEvNextVal, Handle); + hFunc(TEvSequenceProxy::TEvGetSequence, Handle); + hFunc(TEvSequenceProxy::TEvGetSequenceResult, Handle); hFunc(TEvPrivate::TEvAllocateResult, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); } @@ -140,23 +158,32 @@ namespace NSequenceProxy { void HandlePoison(); void Handle(TEvSequenceProxy::TEvNextVal::TPtr& ev); + void Handle(TEvSequenceProxy::TEvGetSequence::TPtr& ev); void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev); + void Handle(TEvSequenceProxy::TEvGetSequenceResult::TPtr& ev); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value); + void Reply(const TGetSequenceRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + void Reply(const TGetSequenceRequestInfo& request, const TEvSequenceProxy::TEvGetSequenceResult& ev); ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion); ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache); + ui64 StartGetSequence(ui64 tabletId, const TString& database, const TPathId& pathId); void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true); + void DoGetSequence(TGetSequenceRequestInfo&& request, const TString& database, const TPathId& pathId); void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result); void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result); - void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved); + void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolvedNextVal, TList<TGetSequenceRequestInfo>& resolvedGetSequence); void OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info); - bool DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info); + + template <class TRequestInfo> + bool DoMaybeReplyUnauthorized(const TRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info); + bool DoReplyFromCache(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info); private: @@ -165,6 +192,7 @@ namespace NSequenceProxy { THashMap<TString, TDatabaseState> Databases; THashMap<ui64, TResolveInFlight> ResolveInFlight; THashMap<ui64, TAllocateInFlight> AllocateInFlight; + THashMap<ui64, TGetSequenceInFlight> GetSequenceInFlight; ui64 LastCookie = 0; TIntrusivePtr<TSequenceProxyCounters> Counters; }; diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp index 50931d8935c..0400d38032d 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp @@ -126,11 +126,31 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge); } + TPathId DescribeSequence(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) { + TAutoPtr<IEventHandle> handle; + + auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>(); + request->Record.MutableDescribePath()->SetPath(path); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(handle); + + UNIT_ASSERT(reply->GetRecord().GetPathDescription().HasSequenceDescription()); + + const auto& sequenceDescription = reply->GetRecord().GetPathDescription().GetSequenceDescription(); + + return TPathId::FromProto(sequenceDescription.GetPathId()); + } + void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) { auto request = MakeHolder<TEvSequenceProxy::TEvNextVal>(path); runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release())); } + void SendGetSequenceRequest(TTestActorRuntime& runtime, const TActorId& sender, const TPathId& pathId) { + auto request = MakeHolder<TEvSequenceProxy::TEvGetSequence>(pathId); + runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release())); + } + i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvNextValResult>(sender); auto* msg = ev->Get(); @@ -138,33 +158,75 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { return msg->Status == Ydb::StatusIds::SUCCESS ? msg->Value : 0; } + i64 WaitGetSequenceResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { + auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvGetSequenceResult>(sender); + auto* msg = ev->Get(); + if (msg->Status != Ydb::StatusIds::SUCCESS) { + return 0; + } + UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus); + UNIT_ASSERT_VALUES_EQUAL(msg->MinValue, 1); + UNIT_ASSERT_VALUES_EQUAL(msg->MaxValue, 9223372036854775807LL); + UNIT_ASSERT_VALUES_EQUAL(msg->StartValue, 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Cache, 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Increment, 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Cycle, false); + UNIT_ASSERT_VALUES_EQUAL(msg->NextUsed, false); + return msg->NextValue; + } + i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { auto sender = runtime.AllocateEdgeActor(0); SendNextValRequest(runtime, sender, path); return WaitNextValResult(runtime, sender, expectedStatus); } + i64 DoGetSequence(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { + auto sender = runtime.AllocateEdgeActor(0); + auto pathId = DescribeSequence(runtime, sender, path); + SendGetSequenceRequest(runtime, sender, pathId); + return WaitGetSequenceResult(runtime, sender, expectedStatus); + } + } // namespace Y_UNIT_TEST(Basics) { TTenantTestRuntime runtime(MakeTenantTestConfig(false)); StartSchemeCache(runtime); + auto sender = runtime.AllocateEdgeActor(0); CreateSequence(runtime, "/dc-1", R"( Name: "seq" )"); - i64 value = DoNextVal(runtime, "/dc-1/seq"); + i64 value = DoGetSequence(runtime, "/dc-1/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 1); + + value = DoNextVal(runtime, "/dc-1/seq"); UNIT_ASSERT_VALUES_EQUAL(value, 1); + value = DoGetSequence(runtime, "/dc-1/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 2); + + value = DoGetSequence(runtime, "/dc-1/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 2); + + DoNextVal(runtime, "/dc-1/noseq", Ydb::StatusIds::SCHEME_ERROR); + SendGetSequenceRequest(runtime, sender, TPathId()); + WaitGetSequenceResult(runtime, sender, Ydb::StatusIds::SCHEME_ERROR); + ui64 allocateEvents = 0; + ui64 getSequenceEvents = 0; auto observerFunc = [&](auto& ev) { switch (ev->GetTypeRewrite()) { case TEvSequenceShard::TEvAllocateSequence::EventType: ++allocateEvents; break; + case TEvSequenceShard::TEvGetSequence::EventType: + ++getSequenceEvents; + break; default: break; @@ -174,16 +236,36 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { }; auto prevObserver = runtime.SetObserverFunc(observerFunc); - auto sender = runtime.AllocateEdgeActor(0); + auto pathId = DescribeSequence(runtime, sender, "/dc-1/seq"); + for (int i = 0; i < 7; ++i) { SendNextValRequest(runtime, sender, "/dc-1/seq"); } for (int i = 0; i < 7; ++i) { - i64 value = WaitNextValResult(runtime, sender); + value = WaitNextValResult(runtime, sender); UNIT_ASSERT_VALUES_EQUAL(value, 2 + i); } + UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvSequenceShard::TEvAllocateSequence events: " << allocateEvents); + + for (int i = 0; i < 7; ++i) { + SendGetSequenceRequest(runtime, sender, pathId); + } + for (int i = 0; i < 7; ++i) { + value = WaitGetSequenceResult(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(value, 9); + } + UNIT_ASSERT_C(getSequenceEvents < 7, "Too many TEvSequenceShard::TEvGetSequence events: " << getSequenceEvents); + ui64 prevGetSequenceEvents = getSequenceEvents; + + for (int i = 0; i < 7; ++i) { + SendNextValRequest(runtime, sender, "/dc-1/seq"); + SendGetSequenceRequest(runtime, sender, pathId); + } + for (int i = 0; i < 7; ++i) { + WaitGetSequenceResult(runtime, sender); + } - UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvAllocateSequence events: " << allocateEvents); + UNIT_ASSERT_C(getSequenceEvents < 7 + prevGetSequenceEvents, "Too many TEvSequenceShard::TEvGetSequence events: " << getSequenceEvents - prevGetSequenceEvents); } Y_UNIT_TEST(DropRecreate) { diff --git a/ydb/core/tx/sequenceproxy/ya.make b/ydb/core/tx/sequenceproxy/ya.make index 4b904b7427b..02988d04bc1 100644 --- a/ydb/core/tx/sequenceproxy/ya.make +++ b/ydb/core/tx/sequenceproxy/ya.make @@ -10,6 +10,7 @@ PEERDIR( SRCS( sequenceproxy.cpp sequenceproxy_allocate.cpp + sequenceproxy_get_sequence.cpp sequenceproxy_impl.cpp sequenceproxy_resolve.cpp ) |