diff options
author | snaury <snaury@ydb.tech> | 2023-10-05 12:21:17 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-10-05 12:45:44 +0300 |
commit | 2aeb1284cb0378d71f2246cecd51c06dc02e6a0c (patch) | |
tree | 002377d3db2df76fe10e95af732f4df57f8c608f | |
parent | 0e697a89baaa433be9b648eaf849449354b95b04 (diff) | |
download | ydb-2aeb1284cb0378d71f2246cecd51c06dc02e6a0c.tar.gz |
Resolve sequence from cache before every string path request KIKIMR-19545
-rw-r--r-- | ydb/core/tx/scheme_board/cache.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/scheme_cache.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp | 100 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.h | 32 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp | 161 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp | 47 |
7 files changed, 188 insertions, 168 deletions
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 59a0671879e..5da76e0423b 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -247,7 +247,7 @@ namespace { << ", recipient# " << Context->Sender << ", result# " << Context->Request->ToString(*AppData()->TypeRegistry)); - this->Send(Context->Sender, new TEvResult(Context->Request.Release())); + this->Send(Context->Sender, new TEvResult(Context->Request.Release()), 0, Context->Cookie); this->PassAway(); } @@ -2500,7 +2500,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { template <typename TContext, typename TEvent> TIntrusivePtr<TContext> MakeContext(TEvent& ev) const { - TIntrusivePtr<TContext> context(new TContext(ev->Sender, ev->Get()->Request, Now())); + TIntrusivePtr<TContext> context(new TContext(ev->Sender, ev->Cookie, ev->Get()->Request, Now())); if (context->Request->DatabaseName) { if (auto* db = Cache.FindPtr(CanonizePath(context->Request->DatabaseName))) { @@ -2603,7 +2603,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { void Handle(TEvTxProxySchemeCache::TEvInvalidateTable::TPtr& ev) { SBC_LOG_D("Handle TEvTxProxySchemeCache::TEvInvalidateTable" << ": self# " << SelfId()); - Send(ev->Sender, new TEvTxProxySchemeCache::TEvInvalidateTableResult(ev->Get()->Sender)); + Send(ev->Sender, new TEvTxProxySchemeCache::TEvInvalidateTableResult(ev->Get()->Sender), 0, ev->Cookie); } TActorId EnsureWatchCache() { diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index c3e8a9df21e..ad865e6da5b 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -376,13 +376,15 @@ struct TSchemeCacheRequest { struct TSchemeCacheRequestContext : TAtomicRefCount<TSchemeCacheRequestContext>, TNonCopyable { TActorId Sender; + ui64 Cookie; ui64 WaitCounter; TAutoPtr<TSchemeCacheRequest> Request; const TInstant CreatedAt; TIntrusivePtr<TDomainInfo> ResolvedDomainInfo; // resolved from DatabaseName - TSchemeCacheRequestContext(const TActorId& sender, TAutoPtr<TSchemeCacheRequest> request, const TInstant& now = TInstant::Now()) + TSchemeCacheRequestContext(const TActorId& sender, ui64 cookie, TAutoPtr<TSchemeCacheRequest> request, const TInstant& now = TInstant::Now()) : Sender(sender) + , Cookie(cookie) , WaitCounter(0) , Request(request) , CreatedAt(now) @@ -391,13 +393,15 @@ struct TSchemeCacheRequestContext : TAtomicRefCount<TSchemeCacheRequestContext>, struct TSchemeCacheNavigateContext : TAtomicRefCount<TSchemeCacheNavigateContext>, TNonCopyable { TActorId Sender; + ui64 Cookie; ui64 WaitCounter; TAutoPtr<TSchemeCacheNavigate> Request; const TInstant CreatedAt; TIntrusivePtr<TDomainInfo> ResolvedDomainInfo; // resolved from DatabaseName - TSchemeCacheNavigateContext(const TActorId& sender, TAutoPtr<TSchemeCacheNavigate> request, const TInstant& now = TInstant::Now()) + TSchemeCacheNavigateContext(const TActorId& sender, ui64 cookie, TAutoPtr<TSchemeCacheNavigate> request, const TInstant& now = TInstant::Now()) : Sender(sender) + , Cookie(cookie) , WaitCounter(0) , Request(request) , CreatedAt(now) diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp index 4f3a8f645ac..791e4c2e862 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp @@ -9,7 +9,7 @@ #include <util/string/builder.h> #define TXLOG_LOG(priority, stream) \ - LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream) + LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream) #define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream) #define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream) #define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream) diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp index f421ecf8dd7..d3a7de35145 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp @@ -7,7 +7,7 @@ #include <util/string/builder.h> #define TXLOG_LOG(priority, stream) \ - LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream) + LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream) #define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream) #define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream) #define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream) @@ -37,18 +37,18 @@ namespace NSequenceProxy { msg->Path); } - void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) { - auto& info = Databases[database].SequenceByName[path]; - if (!info.ResolveInProgress) { + void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) { + if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) { + info.PendingNextValResolve = std::move(info.NewNextValResolve); StartResolve(database, path, !info.PathId); info.ResolveInProgress = true; } - if (!info.PathId) { - info.PendingNextValResolve.emplace_back(std::move(request)); - return; - } + } - DoNextVal(std::move(request), database, info.PathId, /* needRefresh */ false); + void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) { + auto& info = Databases[database].SequenceByName[path]; + info.NewNextValResolve.emplace_back(std::move(request)); + MaybeStartResolve(database, path, info); } void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) { @@ -77,80 +77,74 @@ namespace NSequenceProxy { OnChanged(database, pathId, info); } - void TSequenceProxy::Handle(TEvPrivate::TEvResolveResult::TPtr& ev) { - auto* msg = ev->Get(); - auto it = ResolveInFlight.find(ev->Cookie); - Y_VERIFY(it != ResolveInFlight.end()); - auto database = it->second.Database; - auto path = it->second.Path; - ResolveInFlight.erase(it); + void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + auto& info = Databases[database].SequenceByName[path]; + Y_VERIFY(info.ResolveInProgress); + info.ResolveInProgress = false; - std::visit( - [&](const auto& path) { - OnResolveResult(database, path, msg); - }, - path); + while (!info.PendingNextValResolve.empty()) { + const auto& request = info.PendingNextValResolve.front(); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + info.PendingNextValResolve.pop_front(); + } + + MaybeStartResolve(database, path, info); } - void TSequenceProxy::OnResolveResult(const TString& database, const TString& path, TEvPrivate::TEvResolveResult* msg) { + void TSequenceProxy::OnResolveResult(const TString& database, const TString& path, TResolveResult&& result) { auto& info = Databases[database].SequenceByName[path]; Y_VERIFY(info.ResolveInProgress); info.ResolveInProgress = false; - if (msg->Status != Ydb::StatusIds::SUCCESS) { - while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie); - info.PendingNextValResolve.pop_front(); - } - return; - } - - auto pathId = msg->PathId; + auto pathId = result.PathId; Y_VERIFY(pathId); info.PathId = pathId; - Y_VERIFY(msg->SequenceInfo); + Y_VERIFY(result.SequenceInfo); auto& infoById = Databases[database].SequenceByPathId[pathId]; - infoById.SequenceInfo = msg->SequenceInfo; - infoById.SecurityObject = msg->SecurityObject; - infoById.PendingNextValResolve.splice(infoById.PendingNextValResolve.end(), info.PendingNextValResolve); - OnResolved(database, pathId, infoById); + infoById.SequenceInfo = result.SequenceInfo; + infoById.SecurityObject = result.SecurityObject; + OnResolved(database, pathId, infoById, info.PendingNextValResolve); + + MaybeStartResolve(database, path, info); } - void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TEvPrivate::TEvResolveResult* msg) { + void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { auto& info = Databases[database].SequenceByPathId[pathId]; Y_VERIFY(info.ResolveInProgress); info.ResolveInProgress = false; - if (msg->Status != Ydb::StatusIds::SUCCESS) { - while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie); - info.PendingNextValResolve.pop_front(); - } - return; + while (!info.PendingNextValResolve.empty()) { + const auto& request = info.PendingNextValResolve.front(); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + info.PendingNextValResolve.pop_front(); } + } + + void TSequenceProxy::OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result) { + auto& info = Databases[database].SequenceByPathId[pathId]; + Y_VERIFY(info.ResolveInProgress); + info.ResolveInProgress = false; - Y_VERIFY(msg->SequenceInfo); - info.SequenceInfo = msg->SequenceInfo; - info.SecurityObject = msg->SecurityObject; - OnResolved(database, pathId, info); + Y_VERIFY(result.SequenceInfo); + info.SequenceInfo = result.SequenceInfo; + info.SecurityObject = result.SecurityObject; + OnResolved(database, pathId, info, info.PendingNextValResolve); } - void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info) { + void TSequenceProxy::OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved) { info.LastKnownTabletId = info.SequenceInfo->Description.GetSequenceShard(); info.DefaultCacheSize = Max(info.SequenceInfo->Description.GetCache(), ui64(1)); - while (!info.PendingNextValResolve.empty()) { - auto& request = info.PendingNextValResolve.front(); + while (!resolved.empty()) { + auto& request = resolved.front(); if (!DoMaybeReplyUnauthorized(request, pathId, info)) { info.PendingNextVal.emplace_back(std::move(request)); ++info.TotalRequested; } - info.PendingNextValResolve.pop_back(); + resolved.pop_back(); } OnChanged(database, pathId, info); diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h index 9d066460261..afa5b814951 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h @@ -26,25 +26,16 @@ namespace NSequenceProxy { void Bootstrap(); private: - class TResolveActor; class TAllocateActor; using TSequenceInfo = NSchemeCache::TSchemeCacheNavigate::TSequenceInfo; struct TEvPrivate { enum EEv { - EvResolveResult = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvBegin = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), EvAllocateResult, }; - struct TEvResolveResult : public TEventLocal<TEvResolveResult, EvResolveResult> { - Ydb::StatusIds::StatusCode Status; - NYql::TIssues Issues; - TPathId PathId; - TIntrusiveConstPtr<TSequenceInfo> SequenceInfo; - TIntrusivePtr<TSecurityObject> SecurityObject; - }; - struct TEvAllocateResult : public TEventLocal<TEvAllocateResult, EvAllocateResult> { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; @@ -56,6 +47,13 @@ namespace NSequenceProxy { }; private: + struct TResolveResult { + TPathId PathId; + TIntrusiveConstPtr<TSequenceInfo> SequenceInfo; + TIntrusivePtr<TSecurityObject> SecurityObject; + }; + + private: struct TNextValRequestInfo { TActorId Sender; ui64 Cookie; @@ -93,6 +91,7 @@ namespace NSequenceProxy { // When requests come using sequence name they end up here first struct TSequenceByName { TPathId PathId; + TList<TNextValRequestInfo> NewNextValResolve; TList<TNextValRequestInfo> PendingNextValResolve; bool ResolveInProgress = false; }; @@ -118,23 +117,26 @@ namespace NSequenceProxy { switch (ev->GetTypeRewrite()) { sFunc(TEvents::TEvPoison, HandlePoison); hFunc(TEvSequenceProxy::TEvNextVal, Handle); - hFunc(TEvPrivate::TEvResolveResult, Handle); hFunc(TEvPrivate::TEvAllocateResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); } } void HandlePoison(); void Handle(TEvSequenceProxy::TEvNextVal::TPtr& ev); - void Handle(TEvPrivate::TEvResolveResult::TPtr& ev); void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev); + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion); ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache); + void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true); - void OnResolveResult(const TString& database, const TString& path, TEvPrivate::TEvResolveResult* msg); - void OnResolveResult(const TString& database, const TPathId& pathId, TEvPrivate::TEvResolveResult* msg); - void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info); + void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); + void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); + void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result); + void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result); + void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved); void OnChanged(const TString& database, const TPathId& pathId, TSequenceByPathId& info); bool DoMaybeReplyUnauthorized(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info); bool DoReplyFromCache(const TNextValRequestInfo& request, const TPathId& pathId, TSequenceByPathId& info); diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp index 3f18cc08061..8ffe66e554e 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_resolve.cpp @@ -9,7 +9,7 @@ #include <util/string/builder.h> #define TXLOG_LOG(priority, stream) \ - LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::LONG_TX_SERVICE, LogPrefix << stream) + LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::SEQUENCEPROXY, LogPrefix << stream) #define TXLOG_DEBUG(stream) TXLOG_LOG(NActors::NLog::PRI_DEBUG, stream) #define TXLOG_NOTICE(stream) TXLOG_LOG(NActors::NLog::PRI_NOTICE, stream) #define TXLOG_ERROR(stream) TXLOG_LOG(NActors::NLog::PRI_ERROR, stream) @@ -17,122 +17,97 @@ namespace NKikimr { namespace NSequenceProxy { - class TSequenceProxy::TResolveActor : public TActorBootstrapped<TResolveActor> { + namespace { + using TSchemeCacheNavigate = NSchemeCache::TSchemeCacheNavigate; using TEntry = TSchemeCacheNavigate::TEntry; using ERequestType = TEntry::ERequestType; - private: - struct TOutputHelper { - const std::variant<TString, TPathId>& value; - - friend inline IOutputStream& operator<<(IOutputStream& out, const TOutputHelper& helper) { - std::visit( - [&out](const auto& value) { - out << value; - }, - helper.value); - return out; - } - }; - - public: - TResolveActor( - TActorId owner, ui64 cookie, - const TString& database, - const std::variant<TString, TPathId>& path, - bool syncVersion) - : Owner(owner) - , Cookie(cookie) - , Database(database) - , Path(path) - , SyncVersion(syncVersion) - { } - - void Bootstrap() { - auto schemeCache = MakeSchemeCacheID(); - auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - req->DatabaseName = Database; - auto& entry = req->ResultSet.emplace_back(); - std::visit( - [&entry](const auto& path) { - InitPath(entry, path); - }, - Path); - entry.ShowPrivatePath = true; - entry.SyncVersion = SyncVersion; - Send(schemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release())); - Become(&TThis::StateWork); - } - - static void InitPath(TEntry& entry, const TString& path) { + void InitPath(TEntry& entry, const TString& path) { entry.Path = SplitPath(path); entry.RequestType = ERequestType::ByPath; } - static void InitPath(TEntry& entry, const TPathId& pathId) { + void InitPath(TEntry& entry, const TPathId& pathId) { entry.TableId.PathId = pathId; entry.RequestType = ERequestType::ByTableId; } - private: - void ReplyAndDie(Ydb::StatusIds::StatusCode status, const TString& error) { - IssueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, error)); - auto res = MakeHolder<TEvPrivate::TEvResolveResult>(); - res->Status = status; - res->Issues = IssueManager.GetIssues(); - Send(Owner, res.Release(), 0, Cookie); - PassAway(); - } - - private: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - } + void InitPath(TEntry& entry, const std::variant<TString, TPathId>& path) { + std::visit( + [&entry](const auto& path) { + InitPath(entry, path); + }, + path); } - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - auto req = std::move(ev->Get()->Request); - if (req->ErrorCount > 0) { - ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() - << "Failed to resolve sequence " << TOutputHelper{ Path }); - return; - } - - auto& res = req->ResultSet.at(0); - if (!res.SequenceInfo || res.SequenceInfo->Description.GetSequenceShard() == 0) { - ReplyAndDie(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() - << "Failed to resolve sequence " << TOutputHelper{ Path }); - return; - } - - auto reply = MakeHolder<TEvPrivate::TEvResolveResult>(); - reply->Status = Ydb::StatusIds::SUCCESS; - reply->PathId = res.TableId.PathId; - reply->SequenceInfo = res.SequenceInfo; - reply->SecurityObject = res.SecurityObject; - Send(Owner, reply.Release(), 0, Cookie); - PassAway(); + NYql::TIssues MakeResolveIssues(const TString& message) { + NYql::TIssueManager issueManager; + issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, message)); + return issueManager.GetIssues(); } - private: - const TActorId Owner; - const ui64 Cookie; - const TString Database; - const std::variant<TString, TPathId> Path; - const bool SyncVersion; - NYql::TIssueManager IssueManager; - }; + } // namespace ui64 TSequenceProxy::StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion) { ui64 cookie = ++LastCookie; auto& info = ResolveInFlight[cookie]; info.Database = database; info.Path = path; - Register(new TResolveActor(SelfId(), cookie, database, path, syncVersion)); + + auto schemeCache = MakeSchemeCacheID(); + auto req = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); + req->DatabaseName = database; + auto& entry = req->ResultSet.emplace_back(); + InitPath(entry, path); + entry.ShowPrivatePath = true; + entry.SyncVersion = syncVersion; + Send(schemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(req.release()), 0, cookie); + return cookie; } + void TSequenceProxy::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + ui64 cookie = ev->Cookie; + auto it = ResolveInFlight.find(cookie); + Y_VERIFY(it != ResolveInFlight.end(), "TEvNavigateKeySetResult with cookie %" PRIu64 " does not match a previous request", cookie); + auto database = std::move(it->second.Database); + auto path = std::move(it->second.Path); + ResolveInFlight.erase(it); + + auto req = std::move(ev->Get()->Request); + if (req->ErrorCount > 0) { + std::visit( + [this, &database](auto& path) { + OnResolveError(database, path, Ydb::StatusIds::SCHEME_ERROR, MakeResolveIssues(TStringBuilder() + << "Failed to resolve sequence " << path)); + }, + path); + return; + } + + auto& entry = req->ResultSet.at(0); + if (!entry.SequenceInfo || entry.SequenceInfo->Description.GetSequenceShard() == 0) { + std::visit( + [this, &database](auto& path) { + OnResolveError(database, path, Ydb::StatusIds::SCHEME_ERROR, MakeResolveIssues(TStringBuilder() + << "Failed to resolve sequence " << path)); + }, + path); + return; + } + + TResolveResult result{ + entry.TableId.PathId, + std::move(entry.SequenceInfo), + std::move(entry.SecurityObject), + }; + std::visit( + [this, &database, &result](auto& path) { + OnResolveResult(database, path, std::move(result)); + }, + path); + } + } // namespace NSequenceProxy } // namespace NKikimr diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp index d331fdef4ae..a7292ab30b7 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_ut.cpp @@ -103,6 +103,28 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge); } + void DropSequence(TTestActorRuntime& runtime, const TString& workingDir, const TString& name) { + auto edge = runtime.AllocateEdgeActor(0); + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + auto* tx = request->Record.MutableTransaction()->MutableModifyScheme(); + tx->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence); + tx->SetWorkingDir(workingDir); + auto* op = tx->MutableDrop(); + op->SetName(name); + runtime.Send(new IEventHandle(MakeTxProxyID(), edge, request.Release())); + + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(edge); + auto* msg = ev->Get(); + const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus()); + Y_VERIFY(status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); + + ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId(); + auto notifyReq = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); + notifyReq->Record.SetTxId(msg->Record.GetTxId()); + runtime.SendToPipe(schemeShardTabletId, edge, notifyReq.Release()); + runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(edge); + } + void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) { auto request = MakeHolder<TEvSequenceProxy::TEvNextVal>(path); runtime.Send(new IEventHandle(MakeSequenceProxyServiceID(), sender, request.Release())); @@ -111,7 +133,7 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { auto ev = runtime.GrabEdgeEventRethrow<TEvSequenceProxy::TEvNextValResult>(sender); auto* msg = ev->Get(); - Y_VERIFY(msg->Status == expectedStatus); + UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus); return msg->Status == Ydb::StatusIds::SUCCESS ? msg->Value : 0; } @@ -163,6 +185,29 @@ Y_UNIT_TEST_SUITE(SequenceProxy) { UNIT_ASSERT_C(allocateEvents < 7, "Too many TEvAllocateSequence events: " << allocateEvents); } + Y_UNIT_TEST(DropRecreate) { + TTenantTestRuntime runtime(MakeTenantTestConfig(false)); + StartSchemeCache(runtime); + + CreateSequence(runtime, "/dc-1", R"( + Name: "seq" + )"); + + i64 value = DoNextVal(runtime, "/dc-1/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 1); + + DropSequence(runtime, "/dc-1", "seq"); + + DoNextVal(runtime, "/dc-1/seq", Ydb::StatusIds::SCHEME_ERROR); + + CreateSequence(runtime, "/dc-1", R"( + Name: "seq" + )"); + + value = DoNextVal(runtime, "/dc-1/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 1); + } + } // Y_UNIT_TEST_SUITE(SequenceProxy) } // namespace NSequenceProxy |