aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-10-05 12:21:17 +0300
committersnaury <snaury@ydb.tech>2023-10-05 12:45:44 +0300
commit2aeb1284cb0378d71f2246cecd51c06dc02e6a0c (patch)
tree002377d3db2df76fe10e95af732f4df57f8c608f
parent0e697a89baaa433be9b648eaf849449354b95b04 (diff)
downloadydb-2aeb1284cb0378d71f2246cecd51c06dc02e6a0c.tar.gz
Resolve sequence from cache before every string path request KIKIMR-19545
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp6
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h8
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp2
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp100
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.h32
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp161
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp47
7 files changed, 188 insertions, 168 deletions
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 59a0671879e..5da76e0423b 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -247,7 +247,7 @@ namespace {
<< ", recipient# " << Context->Sender
<< ", result# " << Context->Request->ToString(*AppData()->TypeRegistry));
- this->Send(Context->Sender, new TEvResult(Context->Request.Release()));
+ this->Send(Context->Sender, new TEvResult(Context->Request.Release()), 0, Context->Cookie);
this->PassAway();
}
@@ -2500,7 +2500,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
template <typename TContext, typename TEvent>
TIntrusivePtr<TContext> MakeContext(TEvent& ev) const {
- TIntrusivePtr<TContext> context(new TContext(ev->Sender, ev->Get()->Request, Now()));
+ TIntrusivePtr<TContext> context(new TContext(ev->Sender, ev->Cookie, ev->Get()->Request, Now()));
if (context->Request->DatabaseName) {
if (auto* db = Cache.FindPtr(CanonizePath(context->Request->DatabaseName))) {
@@ -2603,7 +2603,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
void Handle(TEvTxProxySchemeCache::TEvInvalidateTable::TPtr& ev) {
SBC_LOG_D("Handle TEvTxProxySchemeCache::TEvInvalidateTable"
<< ": self# " << SelfId());
- Send(ev->Sender, new TEvTxProxySchemeCache::TEvInvalidateTableResult(ev->Get()->Sender));
+ Send(ev->Sender, new TEvTxProxySchemeCache::TEvInvalidateTableResult(ev->Get()->Sender), 0, ev->Cookie);
}
TActorId EnsureWatchCache() {
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index c3e8a9df21e..ad865e6da5b 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -376,13 +376,15 @@ struct TSchemeCacheRequest {
struct TSchemeCacheRequestContext : TAtomicRefCount<TSchemeCacheRequestContext>, TNonCopyable {
TActorId Sender;
+ ui64 Cookie;
ui64 WaitCounter;
TAutoPtr<TSchemeCacheRequest> Request;
const TInstant CreatedAt;
TIntrusivePtr<TDomainInfo> ResolvedDomainInfo; // resolved from DatabaseName
- TSchemeCacheRequestContext(const TActorId& sender, TAutoPtr<TSchemeCacheRequest> request, const TInstant& now = TInstant::Now())
+ TSchemeCacheRequestContext(const TActorId& sender, ui64 cookie, TAutoPtr<TSchemeCacheRequest> request, const TInstant& now = TInstant::Now())
: Sender(sender)
+ , Cookie(cookie)
, WaitCounter(0)
, Request(request)
, CreatedAt(now)
@@ -391,13 +393,15 @@ struct TSchemeCacheRequestContext : TAtomicRefCount<TSchemeCacheRequestContext>,
struct TSchemeCacheNavigateContext : TAtomicRefCount<TSchemeCacheNavigateContext>, TNonCopyable {
TActorId Sender;
+ ui64 Cookie;
ui64 WaitCounter;
TAutoPtr<TSchemeCacheNavigate> Request;
const TInstant CreatedAt;
TIntrusivePtr<TDomainInfo> ResolvedDomainInfo; // resolved from DatabaseName
- TSchemeCacheNavigateContext(const TActorId& sender, TAutoPtr<TSchemeCacheNavigate> request, const TInstant& now = TInstant::Now())
+ TSchemeCacheNavigateContext(const TActorId& sender, ui64 cookie, TAutoPtr<TSchemeCacheNavigate> request, const TInstant& now = TInstant::Now())
: Sender(sender)
+ , Cookie(cookie)
, WaitCounter(0)
, Request(request)
, CreatedAt(now)
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
index 4f3a8f645ac..791e4c2e862 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
@@ -9,7 +9,7 @@
#include <util/string/builder.h>
#define TXLOG_LOG(priority, stream) \
- LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream)
+ LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream)
#define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream)
#define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream)
#define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream)
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
index f421ecf8dd7..d3a7de35145 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
@@ -7,7 +7,7 @@
#include <util/string/builder.h>
#define TXLOG_LOG(priority, stream) \
- LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream)
+ LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream)
#define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream)
#define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream)
#define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream)
@@ -37,18 +37,18 @@ namespace NSequenceProxy {
msg->Path);
}
- void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
- auto& info = Databases[database].SequenceByName[path];
- if (!info.ResolveInProgress) {
+ void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
+ if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
+ info.PendingNextValResolve = std::move(info.NewNextValResolve);
StartResolve(database, path, !info.PathId);
info.ResolveInProgress = true;
}
- if (!info.PathId) {
- info.PendingNextValResolve.emplace_back(std::move(request));
- return;
- }
+ }
- DoNextVal(std::move(request), database, info.PathId, /* needRefresh */ false);
+ void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
+ auto& info = Databases[database].SequenceByName[path];
+ info.NewNextValResolve.emplace_back(std::move(request));
+ MaybeStartResolve(database, path, info);
}
void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) {
@@ -77,80 +77,74 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}
- void TSequenceProxy::Handle(TEvPrivate::TEvResolveResult::TPtr& ev) {
- auto* msg = ev->Get();
- auto it = ResolveInFlight.find(ev->Cookie);
- Y_VERIFY(it != ResolveInFlight.end());
- auto database = it->second.Database;
- auto path = it->second.Path;
- ResolveInFlight.erase(it);
+ void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
+ auto& info = Databases[database].SequenceByName[path];
+ Y_VERIFY(info.ResolveInProgress);
+ info.ResolveInProgress = false;
- std::visit(
- [&](const auto& path) {
- OnResolveResult(database, path, msg);
- },
- path);
+ while (!info.PendingNextValResolve.empty()) {
+ const auto& request = info.PendingNextValResolve.front();
+ Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
+ info.PendingNextValResolve.pop_front();
+ }
+
+ MaybeStartResolve(database, path, info);
}
- void TSequenceProxy::OnResolveResult(const TString& database, const TString& path, TEvPrivate::TEvResolveResult* msg) {
+ void TSequenceProxy::OnResolveResult(const TString& database, const TString& path, TResolveResult&& result) {
auto& info = Databases[database].SequenceByName[path];
Y_VERIFY(info.ResolveInProgress);
info.ResolveInProgress = false;
- if (msg->Status != Ydb::StatusIds::SUCCESS) {
- while (!info.PendingNextValResolve.empty()) {
- const auto& request = info.PendingNextValResolve.front();
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
- info.PendingNextValResolve.pop_front();
- }
- return;
- }
-
- auto pathId = msg->PathId;
+ auto pathId = result.PathId;
Y_VERIFY(pathId);
info.PathId = pathId;
- Y_VERIFY(msg->SequenceInfo);
+ Y_VERIFY(result.SequenceInfo);
auto& infoById = Databases[database].SequenceByPathId[pathId];
- infoById.SequenceInfo = msg->SequenceInfo;
- infoById.SecurityObject = msg->SecurityObject;
- infoById.PendingNextValResolve.splice(infoById.PendingNextValResolve.end(), info.PendingNextValResolve);
- OnResolved(database, pathId, infoById);
+ infoById.SequenceInfo = result.SequenceInfo;
+ infoById.SecurityObject = result.SecurityObject;
+ OnResolved(database, pathId, infoById, info.PendingNextValResolve);
+
+ MaybeStartResolve(database, path, info);
}
- void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TEvPrivate::TEvResolveResult* msg) {
+ void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
auto& info = Databases[database].SequenceByPathId[pathId];
Y_VERIFY(info.ResolveInProgress);
info.ResolveInProgress = false;
- if (msg->Status != Ydb::StatusIds::SUCCESS) {
- while (!info.PendingNextValResolve.empty()) {
- const auto& request = info.PendingNextValResolve.front();
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
- info.PendingNextValResolve.pop_front();
- }
- return;
+ while (!info.PendingNextValResolve.empty()) {
+ const auto& request = info.PendingNextValResolve.front();
+ Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
+ info.PendingNextValResolve.pop_front();
}
+ }
+
+ void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result) {
+ auto& info = Databases[database].SequenceByPathId[pathId];
+ Y_VERIFY(info.ResolveInProgress);
+ info.ResolveInProgress = false;
- Y_VERIFY(msg->SequenceInfo);
- info.SequenceInfo = msg->SequenceInfo;
- info.SecurityObject = msg->SecurityObject;
- OnResolved(database, pathId, info);
+ Y_VERIFY(result.SequenceInfo);
+ info.SequenceInfo = result.SequenceInfo;
+ info.SecurityObject = result.SecurityObject;
+ OnResolved(database, pathId, info, info.PendingNextValResolve);
}
- void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info) {
+ void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved) {
info.LastKnownTabletId = info.SequenceInfo->Description.GetSequenceShard();
info.DefaultCacheSize = Max(info.SequenceInfo->Description.GetCache(), ui64(1));
- while (!info.PendingNextValResolve.empty()) {
- auto& request = info.PendingNextValResolve.front();
+ while (!resolved.empty()) {
+ auto& request = resolved.front();
if (!DoMaybeReplyUnauthorized(request, pathId, info)) {
info.PendingNextVal.emplace_back(std::move(request));
++info.TotalRequested;
}
- info.PendingNextValResolve.pop_back();
+ resolved.pop_back();
}
OnChanged(database, pathId, info);
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
index 9d066460261..afa5b814951 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
@@ -26,25 +26,16 @@ namespace NSequenceProxy {
void Bootstrap();
private:
- class TResolveActor;
class TAllocateActor;
using TSequenceInfo = NSchemeCache::TSchemeCacheNavigate::TSequenceInfo;
struct TEvPrivate {
enum EEv {
- EvResolveResult = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvBegin = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
EvAllocateResult,
};
- struct TEvResolveResult : public TEventLocal<TEvResolveResult, EvResolveResult> {
- Ydb::StatusIds::StatusCode Status;
- NYql::TIssues Issues;
- TPathId PathId;
- TIntrusiveConstPtr<TSequenceInfo> SequenceInfo;
- TIntrusivePtr<TSecurityObject> SecurityObject;
- };
-
struct TEvAllocateResult : public TEventLocal<TEvAllocateResult, EvAllocateResult> {
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
@@ -56,6 +47,13 @@ namespace NSequenceProxy {
};
private:
+ struct TResolveResult {
+ TPathId PathId;
+ TIntrusiveConstPtr<TSequenceInfo> SequenceInfo;
+ TIntrusivePtr<TSecurityObject> SecurityObject;
+ };
+
+ private:
struct TNextValRequestInfo {
TActorId Sender;
ui64 Cookie;
@@ -93,6 +91,7 @@ namespace NSequenceProxy {
// When requests come using sequence name they end up here first
struct TSequenceByName {
TPathId PathId;
+ TList<TNextValRequestInfo> NewNextValResolve;
TList<TNextValRequestInfo> PendingNextValResolve;
bool ResolveInProgress = false;
};
@@ -118,23 +117,26 @@ namespace NSequenceProxy {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, HandlePoison);
hFunc(TEvSequenceProxy::TEvNextVal, Handle);
- hFunc(TEvPrivate::TEvResolveResult, Handle);
hFunc(TEvPrivate::TEvAllocateResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
}
}
void HandlePoison();
void Handle(TEvSequenceProxy::TEvNextVal::TPtr& ev);
- void Handle(TEvPrivate::TEvResolveResult::TPtr& ev);
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
+ void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
- void OnResolveResult(const TString& database, const TString& path, TEvPrivate::TEvResolveResult* msg);
- void OnResolveResult(const TString& database, const TPathId& pathId, TEvPrivate::TEvResolveResult* msg);
- void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info);
+ void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
+ void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
+ void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
+ void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
+ void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
void OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info);
bool DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info);
bool DoReplyFromCache(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info);
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp
index 3f18cc08061..8ffe66e554e 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp
@@ -9,7 +9,7 @@
#include <util/string/builder.h>
#define TXLOG_LOG(priority, stream) \
- LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream)
+ LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream)
#define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream)
#define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream)
#define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream)
@@ -17,122 +17,97 @@
namespace NKikimr {
namespace NSequenceProxy {
- class TSequenceProxy::TResolveActor : public TActorBootstrapped<TResolveActor> {
+ namespace {
+
using TSchemeCacheNavigate = NSchemeCache::TSchemeCacheNavigate;
using TEntry = TSchemeCacheNavigate::TEntry;
using ERequestType = TEntry::ERequestType;
- private:
- struct TOutputHelper {
- const std::variant<TString, TPathId>& value;
-
- friend inline IOutputStream& operator<<(IOutputStream& out, const TOutputHelper& helper) {
- std::visit(
- [&out](const auto& value) {
- out << value;
- },
- helper.value);
- return out;
- }
- };
-
- public:
- TResolveActor(
- TActorId owner, ui64 cookie,
- const TString& database,
- const std::variant<TString, TPathId>& path,
- bool syncVersion)
- : Owner(owner)
- , Cookie(cookie)
- , Database(database)
- , Path(path)
- , SyncVersion(syncVersion)
- { }
-
- void Bootstrap() {
- auto schemeCache = MakeSchemeCacheID();
- auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- req->DatabaseName = Database;
- auto& entry = req->ResultSet.emplace_back();
- std::visit(
- [&entry](const auto& path) {
- InitPath(entry, path);
- },
- Path);
- entry.ShowPrivatePath = true;
- entry.SyncVersion = SyncVersion;
- Send(schemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release()));
- Become(&TThis::StateWork);
- }
-
- static void InitPath(TEntry& entry, const TString& path) {
+ void InitPath(TEntry& entry, const TString& path) {
entry.Path = SplitPath(path);
entry.RequestType = ERequestType::ByPath;
}
- static void InitPath(TEntry& entry, const TPathId& pathId) {
+ void InitPath(TEntry& entry, const TPathId& pathId) {
entry.TableId.PathId = pathId;
entry.RequestType = ERequestType::ByTableId;
}
- private:
- void ReplyAndDie(Ydb::StatusIds::StatusCode status, const TString& error) {
- IssueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, error));
- auto res = MakeHolder<TEvPrivate::TEvResolveResult>();
- res->Status = status;
- res->Issues = IssueManager.GetIssues();
- Send(Owner, res.Release(), 0, Cookie);
- PassAway();
- }
-
- private:
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- }
+ void InitPath(TEntry& entry, const std::variant<TString, TPathId>& path) {
+ std::visit(
+ [&entry](const auto& path) {
+ InitPath(entry, path);
+ },
+ path);
}
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- auto req = std::move(ev->Get()->Request);
- if (req->ErrorCount > 0) {
- ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder()
- << "Failed to resolve sequence " << TOutputHelper{ Path });
- return;
- }
-
- auto& res = req->ResultSet.at(0);
- if (!res.SequenceInfo || res.SequenceInfo->Description.GetSequenceShard() == 0) {
- ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder()
- << "Failed to resolve sequence " << TOutputHelper{ Path });
- return;
- }
-
- auto reply = MakeHolder<TEvPrivate::TEvResolveResult>();
- reply->Status = Ydb::StatusIds::SUCCESS;
- reply->PathId = res.TableId.PathId;
- reply->SequenceInfo = res.SequenceInfo;
- reply->SecurityObject = res.SecurityObject;
- Send(Owner, reply.Release(), 0, Cookie);
- PassAway();
+ NYql::TIssues MakeResolveIssues(const TString& message) {
+ NYql::TIssueManager issueManager;
+ issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, message));
+ return issueManager.GetIssues();
}
- private:
- const TActorId Owner;
- const ui64 Cookie;
- const TString Database;
- const std::variant<TString, TPathId> Path;
- const bool SyncVersion;
- NYql::TIssueManager IssueManager;
- };
+ } // namespace
ui64 TSequenceProxy::StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion) {
ui64 cookie = ++LastCookie;
auto& info = ResolveInFlight[cookie];
info.Database = database;
info.Path = path;
- Register(new TResolveActor(SelfId(), cookie, database, path, syncVersion));
+
+ auto schemeCache = MakeSchemeCacheID();
+ auto req = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+ req->DatabaseName = database;
+ auto& entry = req->ResultSet.emplace_back();
+ InitPath(entry, path);
+ entry.ShowPrivatePath = true;
+ entry.SyncVersion = syncVersion;
+ Send(schemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(req.release()), 0, cookie);
+
return cookie;
}
+ void TSequenceProxy::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ ui64 cookie = ev->Cookie;
+ auto it = ResolveInFlight.find(cookie);
+ Y_VERIFY(it != ResolveInFlight.end(), "TEvNavigateKeySetResult with cookie %" PRIu64 " does not match a previous request", cookie);
+ auto database = std::move(it->second.Database);
+ auto path = std::move(it->second.Path);
+ ResolveInFlight.erase(it);
+
+ auto req = std::move(ev->Get()->Request);
+ if (req->ErrorCount > 0) {
+ std::visit(
+ [this, &database](auto& path) {
+ OnResolveError(database, path, Ydb::StatusIds::SCHEME_ERROR, MakeResolveIssues(TStringBuilder()
+ << "Failed to resolve sequence " << path));
+ },
+ path);
+ return;
+ }
+
+ auto& entry = req->ResultSet.at(0);
+ if (!entry.SequenceInfo || entry.SequenceInfo->Description.GetSequenceShard() == 0) {
+ std::visit(
+ [this, &database](auto& path) {
+ OnResolveError(database, path, Ydb::StatusIds::SCHEME_ERROR, MakeResolveIssues(TStringBuilder()
+ << "Failed to resolve sequence " << path));
+ },
+ path);
+ return;
+ }
+
+ TResolveResult result{
+ entry.TableId.PathId,
+ std::move(entry.SequenceInfo),
+ std::move(entry.SecurityObject),
+ };
+ std::visit(
+ [this, &database, &result](auto& path) {
+ OnResolveResult(database, path, std::move(result));
+ },
+ path);
+ }
+
} // namespace NSequenceProxy
} // namespace NKikimr
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
index d331fdef4ae..a7292ab30b7 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp
@@ -103,6 +103,28 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge);
}
+ void DropSequence(TTestActorRuntime& runtime, const TString& workingDir, const TString& name) {
+ auto edge = runtime.AllocateEdgeActor(0);
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ auto* tx = request->Record.MutableTransaction()->MutableModifyScheme();
+ tx->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence);
+ tx->SetWorkingDir(workingDir);
+ auto* op = tx->MutableDrop();
+ op->SetName(name);
+ runtime.Send(new IEventHandle(MakeTxProxyID(), edge, request.Release()));
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(edge);
+ auto* msg = ev->Get();
+ const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
+ Y_VERIFY(status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
+
+ ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId();
+ auto notifyReq = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
+ notifyReq->Record.SetTxId(msg->Record.GetTxId());
+ runtime.SendToPipe(schemeShardTabletId, edge, notifyReq.Release());
+ runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge);
+ }
+
void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) {
auto request = MakeHolder<TEvSequenceProxy::TEvNextVal>(path);
runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release()));
@@ -111,7 +133,7 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
- Y_VERIFY(msg->Status == expectedStatus);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Status == Ydb::StatusIds::SUCCESS ? msg->Value : 0;
}
@@ -163,6 +185,29 @@ Y_UNIT_TEST_SUITE(SequenceProxy) {
UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvAllocateSequence events: " << allocateEvents);
}
+ Y_UNIT_TEST(DropRecreate) {
+ TTenantTestRuntime runtime(MakeTenantTestConfig(false));
+ StartSchemeCache(runtime);
+
+ CreateSequence(runtime, "/dc-1", R"(
+ Name: "seq"
+ )");
+
+ i64 value = DoNextVal(runtime, "/dc-1/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 1);
+
+ DropSequence(runtime, "/dc-1", "seq");
+
+ DoNextVal(runtime, "/dc-1/seq", Ydb::StatusIds::SCHEME_ERROR);
+
+ CreateSequence(runtime, "/dc-1", R"(
+ Name: "seq"
+ )");
+
+ value = DoNextVal(runtime, "/dc-1/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 1);
+ }
+
} // Y_UNIT_TEST_SUITE(SequenceProxy)
} // namespace NSequenceProxy