aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shumkov <shumkovnd@ydb.tech>2025-04-15 17:17:33 +0300
committerGitHub <noreply@github.com>2025-04-15 14:17:33 +0000
commitea212ccacb42f706accf08bfcfcf6af4793408c3 (patch)
tree86b3b4744d2278548fc6aa0db440979bb60054fd
parent03e4e829e111195898215179219e7eb7e5ac0ca0 (diff)
downloadydb-ea212ccacb42f706accf08bfcfcf6af4793408c3.tar.gz
Support GetSequence in SequenceProxy (#17075)
-rw-r--r--ydb/core/tx/sequenceproxy/public/events.h56
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp133
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp107
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.h34
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp90
-rw-r--r--ydb/core/tx/sequenceproxy/ya.make1
6 files changed, 407 insertions, 14 deletions
diff --git a/ydb/core/tx/sequenceproxy/public/events.h b/ydb/core/tx/sequenceproxy/public/events.h
index a046f1b019c..aa63f48c8c0 100644
--- a/ydb/core/tx/sequenceproxy/public/events.h
+++ b/ydb/core/tx/sequenceproxy/public/events.h
@@ -21,6 +21,8 @@ namespace NSequenceProxy {
EvNextValResult,
EvSetVal,
EvSetValResult,
+ EvGetSequence,
+ EvGetSequenceResult,
EvEnd,
};
@@ -70,6 +72,60 @@ namespace NSequenceProxy {
, Value(value)
{ }
};
+
+ struct TEvGetSequence : public TEventLocal<TEvGetSequence, EvGetSequence> {
+ TString Database;
+ TPathId PathId;
+ TIntrusivePtr<NACLib::TUserToken> UserToken;
+
+ explicit TEvGetSequence(const TPathId& pathId)
+ : PathId(pathId)
+ { }
+
+ TEvGetSequence(const TString& database, const TPathId& pathId)
+ : Database(database)
+ , PathId(pathId)
+ { }
+ };
+
+ struct TEvGetSequenceResult : public TEventLocal<TEvGetSequenceResult, EvGetSequenceResult> {
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+ TPathId PathId;
+
+ i64 MinValue;
+ i64 MaxValue;
+ i64 StartValue;
+ i64 NextValue;
+ bool NextUsed;
+ ui64 Cache;
+ i64 Increment;
+ bool Cycle;
+
+ TEvGetSequenceResult(const TPathId& pathId)
+ : Status(Ydb::StatusIds::SUCCESS)
+ , PathId(pathId)
+ { }
+
+ TEvGetSequenceResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues)
+ : Status(status)
+ , Issues(issues)
+ { }
+
+ explicit TEvGetSequenceResult(const TEvGetSequenceResult& ev)
+ : Status(ev.Status)
+ , Issues(ev.Issues)
+ , PathId(ev.PathId)
+ , MinValue(ev.MinValue)
+ , MaxValue(ev.MaxValue)
+ , StartValue(ev.StartValue)
+ , NextValue(ev.NextValue)
+ , NextUsed(ev.NextUsed)
+ , Cache(ev.Cache)
+ , Increment(ev.Increment)
+ , Cycle(ev.Cycle)
+ { }
+ };
};
} // namespace NSequenceProxy
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp
new file mode 100644
index 00000000000..0c268651ac8
--- /dev/null
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_get_sequence.cpp
@@ -0,0 +1,133 @@
+#include "sequenceproxy_impl.h"
+
+#include <ydb/library/ydb_issue/issue_helpers.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/tx/sequenceshard/public/events.h>
+#include <yql/essentials/public/issue/yql_issue_manager.h>
+
+#include <ydb/library/actors/core/log.h>
+#include <util/string/builder.h>
+
+#define TXLOG_LOG(priority, stream) \
+ LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream)
+#define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream)
+#define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream)
+#define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream)
+
+namespace NKikimr {
+namespace NSequenceProxy {
+
+ using namespace NKikimr::NSequenceShard;
+
+ class TSequenceProxy::TGetSequenceActor : public TActorBootstrapped<TGetSequenceActor> {
+ public:
+ TGetSequenceActor(const TActorId& owner, ui64 cookie, ui64 tabletId, const TPathId& pathId)
+ : Owner(owner)
+ , Cookie(cookie)
+ , PipeCache(MakePipePerNodeCacheID(true))
+ , TabletId(tabletId)
+ , PathId(pathId)
+ { }
+
+ void Bootstrap() {
+ Send(PipeCache,
+ new TEvPipeCache::TEvForward(
+ new TEvSequenceShard::TEvGetSequence(PathId),
+ TabletId,
+ true),
+ IEventHandle::FlagTrackDelivery);
+ Become(&TThis::StateGetSequence);
+ }
+
+ void PassAway() override {
+ Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
+ TActorBootstrapped::PassAway();
+ }
+
+ void ReplyAndDie(Ydb::StatusIds::StatusCode status, NKikimrIssues::TIssuesIds::EIssueCode code, const TString& message) {
+ NYql::TIssueManager issueManager;
+ issueManager.RaiseIssue(MakeIssue(code, message));
+ auto res = MakeHolder<TEvSequenceProxy::TEvGetSequenceResult>(status, issueManager.GetIssues());
+ Send(Owner, res.Release(), 0, Cookie);
+ PassAway();
+ }
+
+ private:
+ STFUNC(StateGetSequence) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSequenceShard::TEvGetSequenceResult, Handle);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ }
+ }
+
+ void Handle(TEvSequenceShard::TEvGetSequenceResult::TPtr& ev) {
+ auto* msg = ev->Get();
+ switch (msg->Record.GetStatus()) {
+ case NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS:
+ break;
+
+ case NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_NOT_FOUND:
+ return ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR,
+ NKikimrIssues::TIssuesIds::PATH_NOT_EXIST,
+ "The specified sequence no longer exists");
+
+ case NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_MOVED:
+ // Switch to a different tablet and retry transparently
+ TabletId = msg->Record.GetMovedTo();
+ return Bootstrap();
+
+ default:
+ return ReplyAndDie(Ydb::StatusIds::INTERNAL_ERROR,
+ NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR,
+ TStringBuilder()
+ << "Unexpected error from sequence shard " << TabletId);
+ }
+
+ auto res = MakeHolder<TEvSequenceProxy::TEvGetSequenceResult>(PathId);
+ res->MinValue = msg->Record.GetMinValue();
+ res->MaxValue = msg->Record.GetMaxValue();
+ res->StartValue = msg->Record.GetStartValue();
+ res->NextValue = msg->Record.GetNextValue();
+ res->NextUsed = msg->Record.GetNextUsed();
+ res->Cache = msg->Record.GetCache();
+ res->Increment = msg->Record.GetIncrement();
+ res->Cycle = msg->Record.GetCycle();
+ Send(Owner, res.Release(), 0, Cookie);
+ PassAway();
+ }
+
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ ui64 tabletId = ev->Get()->TabletId;
+ if (tabletId == TabletId) {
+ const TString error = TStringBuilder()
+ << "sequence shard " << tabletId << " is unavailable";
+ return ReplyAndDie(Ydb::StatusIds::UNAVAILABLE, NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, error);
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr&) {
+ const TString error = "Pipe cache may not be configured correctly";
+ return ReplyAndDie(Ydb::StatusIds::UNAVAILABLE, NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR, error);
+ }
+
+ private:
+ const TActorId Owner;
+ const ui64 Cookie;
+ const TActorId PipeCache;
+ ui64 TabletId;
+ TPathId PathId;
+ NYql::TIssueManager IssueManager;
+ };
+
+ ui64 TSequenceProxy::StartGetSequence(ui64 tabletId, const TString& database, const TPathId& pathId) {
+ ui64 cookie = ++LastCookie;
+ auto& info = GetSequenceInFlight[cookie];
+ info.Database = database;
+ info.PathId = pathId;
+ Register(new TGetSequenceActor(SelfId(), cookie, tabletId, pathId));
+ return cookie;
+ }
+
+} // namespace NSequenceProxy
+} // namespace NKikimr
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
index 6bc5a0997e2..233046dd247 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
@@ -53,6 +53,16 @@ namespace NSequenceProxy {
msg->Path);
}
+ void TSequenceProxy::Handle(TEvSequenceProxy::TEvGetSequence::TPtr& ev) {
+ auto* msg = ev->Get();
+ TGetSequenceRequestInfo request;
+ request.Sender = ev->Sender;
+ request.Cookie = ev->Cookie;
+ request.UserToken = std::move(msg->UserToken);
+ request.StartAt = AppData()->MonotonicTimeProvider->Now();
+ DoGetSequence(std::move(request), msg->Database, msg->PathId);
+ }
+
void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
Counters->ResponseCount->Inc();
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
@@ -68,6 +78,17 @@ namespace NSequenceProxy {
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie);
}
+ void TSequenceProxy::Reply(const TGetSequenceRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
+ Counters->ResponseCount->Inc();
+ Counters->ErrorsCount->Inc();
+ Send(request.Sender, new TEvSequenceProxy::TEvGetSequenceResult(status, issues), 0, request.Cookie);
+ }
+
+ void TSequenceProxy::Reply(const TGetSequenceRequestInfo& request, const TEvSequenceProxy::TEvGetSequenceResult& ev) {
+ Counters->ResponseCount->Inc();
+ Send(request.Sender, new TEvSequenceProxy::TEvGetSequenceResult(ev), 0, request.Cookie);
+ }
+
void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
info.PendingNextValResolve = std::move(info.NewNextValResolve);
@@ -111,6 +132,28 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}
+ void TSequenceProxy::DoGetSequence(TGetSequenceRequestInfo&& request, const TString& database, const TPathId& pathId) {
+ Counters->RequestCount->Inc();
+
+ auto& info = Databases[database].SequenceByPathId[pathId];
+ if (!info.ResolveInProgress) {
+ StartResolve(database, pathId, !info.SequenceInfo);
+ info.ResolveInProgress = true;
+ }
+ if (!info.SequenceInfo) {
+ info.PendingGetSequenceResolve.emplace_back(std::move(request));
+ return;
+ }
+
+ if (DoMaybeReplyUnauthorized(request, pathId, info)) {
+ return;
+ }
+
+ info.PendingGetSequence.emplace_back(std::move(request));
+
+ OnChanged(database, pathId, info);
+ }
+
void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByName[path];
Y_ABORT_UNLESS(info.ResolveInProgress);
@@ -139,7 +182,9 @@ namespace NSequenceProxy {
auto& infoById = Databases[database].SequenceByPathId[pathId];
infoById.SequenceInfo = result.SequenceInfo;
infoById.SecurityObject = result.SecurityObject;
- OnResolved(database, pathId, infoById, info.PendingNextValResolve);
+
+ TList<TGetSequenceRequestInfo> resolvedGetSequence;
+ OnResolved(database, pathId, infoById, info.PendingNextValResolve, resolvedGetSequence);
MaybeStartResolve(database, path, info);
}
@@ -153,6 +198,11 @@ namespace NSequenceProxy {
Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}
+
+ while (!info.PendingGetSequenceResolve.empty()) {
+ Reply(info.PendingGetSequenceResolve.front(), status, issues);
+ info.PendingGetSequenceResolve.pop_front();
+ }
}
void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result) {
@@ -163,20 +213,29 @@ namespace NSequenceProxy {
Y_ABORT_UNLESS(result.SequenceInfo);
info.SequenceInfo = result.SequenceInfo;
info.SecurityObject = result.SecurityObject;
- OnResolved(database, pathId, info, info.PendingNextValResolve);
+ OnResolved(database, pathId, info, info.PendingNextValResolve, info.PendingGetSequenceResolve);
}
- void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved) {
+ void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info,
+ TList<TNextValRequestInfo>& resolvedNextVal, TList<TGetSequenceRequestInfo>& resolvedGetSequence) {
info.LastKnownTabletId = info.SequenceInfo->Description.GetSequenceShard();
info.DefaultCacheSize = Max(info.SequenceInfo->Description.GetCache(), ui64(1));
- while (!resolved.empty()) {
- auto& request = resolved.front();
+ while (!resolvedNextVal.empty()) {
+ auto& request = resolvedNextVal.front();
if (!DoMaybeReplyUnauthorized(request, pathId, info)) {
info.PendingNextVal.emplace_back(std::move(request));
++info.TotalRequested;
}
- resolved.pop_front();
+ resolvedNextVal.pop_front();
+ }
+
+ while (!resolvedGetSequence.empty()) {
+ auto& request = resolvedGetSequence.front();
+ if (!DoMaybeReplyUnauthorized(request, pathId, info)) {
+ info.PendingGetSequence.emplace_back(std::move(request));
+ }
+ resolvedGetSequence.pop_front();
}
OnChanged(database, pathId, info);
@@ -215,6 +274,34 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}
+ void TSequenceProxy::Handle(TEvSequenceProxy::TEvGetSequenceResult::TPtr& ev) {
+ auto it = GetSequenceInFlight.find(ev->Cookie);
+ Y_ABORT_UNLESS(it != GetSequenceInFlight.end());
+ auto database = it->second.Database;
+ auto pathId = it->second.PathId;
+ GetSequenceInFlight.erase(it);
+
+ auto& info = Databases[database].SequenceByPathId[pathId];
+ Y_ABORT_UNLESS(info.GetSequenceInProgress);
+ info.GetSequenceInProgress = false;
+
+ auto* msg = ev->Get();
+
+ if (msg->Status == Ydb::StatusIds::SUCCESS) {
+ while (!info.PendingGetSequence.empty()) {
+ Reply(info.PendingGetSequence.front(), *msg);
+ info.PendingGetSequence.pop_front();
+ }
+ } else {
+ while (!info.PendingGetSequence.empty()) {
+ Reply(info.PendingGetSequence.front(), msg->Status, msg->Issues);
+ info.PendingGetSequence.pop_front();
+ }
+ }
+
+ OnChanged(database, pathId, info);
+ }
+
void TSequenceProxy::OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info) {
while (info.TotalCached > 0 && !info.PendingNextVal.empty()) {
const auto& request = info.PendingNextVal.front();
@@ -230,9 +317,15 @@ namespace NSequenceProxy {
info.AllocateInProgress = true;
info.TotalAllocating += cache;
}
+
+ if (!info.PendingGetSequence.empty() && !info.GetSequenceInProgress) {
+ StartGetSequence(info.LastKnownTabletId, database, pathId);
+ info.GetSequenceInProgress = true;
+ }
}
- bool TSequenceProxy::DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info) {
+ template <class TRequestInfo>
+ bool TSequenceProxy::DoMaybeReplyUnauthorized(const TRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info) {
if (request.UserToken && info.SecurityObject) {
ui32 access = NACLib::EAccessRights::SelectRow;
if (!info.SecurityObject->CheckAccess(access, *request.UserToken)) {
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
index 6868a82d2b2..f9f7d3074c0 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
@@ -42,13 +42,14 @@ namespace NSequenceProxy {
private:
class TAllocateActor;
+ class TGetSequenceActor;
using TSequenceInfo = NSchemeCache::TSchemeCacheNavigate::TSequenceInfo;
struct TEvPrivate {
enum EEv {
EvBegin = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
- EvAllocateResult,
+ EvAllocateResult
};
struct TEvAllocateResult : public TEventLocal<TEvAllocateResult, EvAllocateResult> {
@@ -76,6 +77,13 @@ namespace NSequenceProxy {
TMonotonic StartAt;
};
+ struct TGetSequenceRequestInfo {
+ TActorId Sender;
+ ui64 Cookie;
+ TIntrusivePtr<NACLib::TUserToken> UserToken;
+ TMonotonic StartAt;
+ };
+
struct TCachedAllocation {
i64 Start, Increment;
ui64 Count;
@@ -95,10 +103,13 @@ namespace NSequenceProxy {
TList<TCachedAllocation> CachedAllocations;
TList<TNextValRequestInfo> PendingNextValResolve;
TList<TNextValRequestInfo> PendingNextVal;
+ TList<TGetSequenceRequestInfo> PendingGetSequenceResolve;
+ TList<TGetSequenceRequestInfo> PendingGetSequence;
ui64 LastKnownTabletId = 0;
ui64 DefaultCacheSize = 0;
bool ResolveInProgress = false;
bool AllocateInProgress = false;
+ bool GetSequenceInProgress = false;
ui64 TotalCached = 0;
ui64 TotalRequested = 0;
ui64 TotalAllocating = 0;
@@ -128,11 +139,18 @@ namespace NSequenceProxy {
TPathId PathId;
};
+ struct TGetSequenceInFlight {
+ TString Database;
+ TPathId PathId;
+ };
+
private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, HandlePoison);
hFunc(TEvSequenceProxy::TEvNextVal, Handle);
+ hFunc(TEvSequenceProxy::TEvGetSequence, Handle);
+ hFunc(TEvSequenceProxy::TEvGetSequenceResult, Handle);
hFunc(TEvPrivate::TEvAllocateResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
}
@@ -140,23 +158,32 @@ namespace NSequenceProxy {
void HandlePoison();
void Handle(TEvSequenceProxy::TEvNextVal::TPtr& ev);
+ void Handle(TEvSequenceProxy::TEvGetSequence::TPtr& ev);
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
+ void Handle(TEvSequenceProxy::TEvGetSequenceResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value);
+ void Reply(const TGetSequenceRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
+ void Reply(const TGetSequenceRequestInfo& request, const TEvSequenceProxy::TEvGetSequenceResult& ev);
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
+ ui64 StartGetSequence(ui64 tabletId, const TString& database, const TPathId& pathId);
void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
+ void DoGetSequence(TGetSequenceRequestInfo&& request, const TString& database, const TPathId& pathId);
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
- void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
+ void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolvedNextVal, TList<TGetSequenceRequestInfo>& resolvedGetSequence);
void OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info);
- bool DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info);
+
+ template <class TRequestInfo>
+ bool DoMaybeReplyUnauthorized(const TRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info);
+
bool DoReplyFromCache(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info);
private:
@@ -165,6 +192,7 @@ namespace NSequenceProxy {
THashMap<TString, TDatabaseState> Databases;
THashMap<ui64, TResolveInFlight> ResolveInFlight;
THashMap<ui64, TAllocateInFlight> AllocateInFlight;
+ THashMap<ui64, TGetSequenceInFlight> GetSequenceInFlight;
ui64 LastCookie = 0;
TIntrusivePtr<TSequenceProxyCounters> Counters;
};
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
index 50931d8935c..0400d38032d 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
@@ -126,11 +126,31 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge);
}
+ TPathId DescribeSequence(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) {
+ TAutoPtr<IEventHandle> handle;
+
+ auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>();
+ request->Record.MutableDescribePath()->SetPath(path);
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
+ auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(handle);
+
+ UNIT_ASSERT(reply->GetRecord().GetPathDescription().HasSequenceDescription());
+
+ const auto& sequenceDescription = reply->GetRecord().GetPathDescription().GetSequenceDescription();
+
+ return TPathId::FromProto(sequenceDescription.GetPathId());
+ }
+
void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) {
auto request = MakeHolder<TEvSequenceProxy::TEvNextVal>(path);
runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release()));
}
+ void SendGetSequenceRequest(TTestActorRuntime& runtime, const TActorId& sender, const TPathId& pathId) {
+ auto request = MakeHolder<TEvSequenceProxy::TEvGetSequence>(pathId);
+ runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release()));
+ }
+
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
@@ -138,33 +158,75 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
return msg->Status == Ydb::StatusIds::SUCCESS ? msg->Value : 0;
}
+ i64 WaitGetSequenceResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvGetSequenceResult>(sender);
+ auto* msg = ev->Get();
+ if (msg->Status != Ydb::StatusIds::SUCCESS) {
+ return 0;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
+ UNIT_ASSERT_VALUES_EQUAL(msg->MinValue, 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->MaxValue, 9223372036854775807LL);
+ UNIT_ASSERT_VALUES_EQUAL(msg->StartValue, 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Cache, 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Increment, 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Cycle, false);
+ UNIT_ASSERT_VALUES_EQUAL(msg->NextUsed, false);
+ return msg->NextValue;
+ }
+
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender, expectedStatus);
}
+ i64 DoGetSequence(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
+ auto sender = runtime.AllocateEdgeActor(0);
+ auto pathId = DescribeSequence(runtime, sender, path);
+ SendGetSequenceRequest(runtime, sender, pathId);
+ return WaitGetSequenceResult(runtime, sender, expectedStatus);
+ }
+
} // namespace
Y_UNIT_TEST(Basics) {
TTenantTestRuntime runtime(MakeTenantTestConfig(false));
StartSchemeCache(runtime);
+ auto sender = runtime.AllocateEdgeActor(0);
CreateSequence(runtime, "/dc-1", R"(
Name: "seq"
)");
- i64 value = DoNextVal(runtime, "/dc-1/seq");
+ i64 value = DoGetSequence(runtime, "/dc-1/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 1);
+
+ value = DoNextVal(runtime, "/dc-1/seq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);
+ value = DoGetSequence(runtime, "/dc-1/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 2);
+
+ value = DoGetSequence(runtime, "/dc-1/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 2);
+
+
DoNextVal(runtime, "/dc-1/noseq", Ydb::StatusIds::SCHEME_ERROR);
+ SendGetSequenceRequest(runtime, sender, TPathId());
+ WaitGetSequenceResult(runtime, sender, Ydb::StatusIds::SCHEME_ERROR);
+
ui64 allocateEvents = 0;
+ ui64 getSequenceEvents = 0;
auto observerFunc = [&](auto& ev) {
switch (ev->GetTypeRewrite()) {
case TEvSequenceShard::TEvAllocateSequence::EventType:
++allocateEvents;
break;
+ case TEvSequenceShard::TEvGetSequence::EventType:
+ ++getSequenceEvents;
+ break;
default:
break;
@@ -174,16 +236,36 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
};
auto prevObserver = runtime.SetObserverFunc(observerFunc);
- auto sender = runtime.AllocateEdgeActor(0);
+ auto pathId = DescribeSequence(runtime, sender, "/dc-1/seq");
+
for (int i = 0; i < 7; ++i) {
SendNextValRequest(runtime, sender, "/dc-1/seq");
}
for (int i = 0; i < 7; ++i) {
- i64 value = WaitNextValResult(runtime, sender);
+ value = WaitNextValResult(runtime, sender);
UNIT_ASSERT_VALUES_EQUAL(value, 2 + i);
}
+ UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvSequenceShard::TEvAllocateSequence events: " << allocateEvents);
+
+ for (int i = 0; i < 7; ++i) {
+ SendGetSequenceRequest(runtime, sender, pathId);
+ }
+ for (int i = 0; i < 7; ++i) {
+ value = WaitGetSequenceResult(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(value, 9);
+ }
+ UNIT_ASSERT_C(getSequenceEvents < 7, "Too many TEvSequenceShard::TEvGetSequence events: " << getSequenceEvents);
+ ui64 prevGetSequenceEvents = getSequenceEvents;
+
+ for (int i = 0; i < 7; ++i) {
+ SendNextValRequest(runtime, sender, "/dc-1/seq");
+ SendGetSequenceRequest(runtime, sender, pathId);
+ }
+ for (int i = 0; i < 7; ++i) {
+ WaitGetSequenceResult(runtime, sender);
+ }
- UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvAllocateSequence events: " << allocateEvents);
+ UNIT_ASSERT_C(getSequenceEvents < 7 + prevGetSequenceEvents, "Too many TEvSequenceShard::TEvGetSequence events: " << getSequenceEvents - prevGetSequenceEvents);
}
Y_UNIT_TEST(DropRecreate) {
diff --git a/ydb/core/tx/sequenceproxy/ya.make b/ydb/core/tx/sequenceproxy/ya.make
index 4b904b7427b..02988d04bc1 100644
--- a/ydb/core/tx/sequenceproxy/ya.make
+++ b/ydb/core/tx/sequenceproxy/ya.make
@@ -10,6 +10,7 @@ PEERDIR(
SRCS(
sequenceproxy.cpp
sequenceproxy_allocate.cpp
+ sequenceproxy_get_sequence.cpp
sequenceproxy_impl.cpp
sequenceproxy_resolve.cpp
)