diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-04-26 11:01:38 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-04-26 11:01:38 +0300 |
commit | 2efc3455fbb791702d7a066738895ffc561519d8 (patch) | |
tree | 8e06473558a7c4b2c0875c0ea423e14810aacf8e | |
parent | c321d56b715aa8ca196cf373c98b02b83251b8bb (diff) | |
download | ydb-2efc3455fbb791702d7a066738895ffc561519d8.tar.gz |
KIKIMR-14790: fix VERIFY in long tx
ref:eaa9e991e3ee16c37ccb03867228055d6c7aca1f
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 34 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 19 |
3 files changed, 35 insertions, 23 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 4da9d39957b..f7b68622edf 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -359,13 +359,13 @@ protected: LongTxId = longTxId; } - void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate* resp) { - if (resp->ErrorCount > 0) { + void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) { + if (resp.ErrorCount > 0) { // TODO: map to a correct error return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query"); } - auto& entry = resp->ResultSet[0]; + auto& entry = resp.ResultSet[0]; if (UserToken && entry.SecurityObject) { const ui32 access = NACLib::UpdateRow; @@ -527,8 +527,7 @@ private: for (auto& issue : issues) { RaiseIssue(issue); } - ReplyError(msg->Record.GetStatus()); - return PassAway(); + return ReplyError(msg->Record.GetStatus()); } ReplySuccess(); @@ -622,7 +621,8 @@ public: void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get(); - ProceedWithSchema(resp); + Y_VERIFY(resp); + ProceedWithSchema(*resp); } private: @@ -679,18 +679,21 @@ public: explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId, const TString& databaseName, const TString& path, - const NSchemeCache::TSchemeCacheNavigate& navigateResult, - std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues) + std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, + std::shared_ptr<arrow::RecordBatch> batch, + std::shared_ptr<NYql::TIssues> issues) : TBase(databaseName, path, TString(), longTxId, dedupId) , ReplyTo(replyTo) , NavigateResult(navigateResult) , Batch(batch) , Issues(issues) { + Y_VERIFY(Issues); } void Bootstrap() { - ProceedWithSchema(&NavigateResult); + Y_VERIFY(NavigateResult); + ProceedWithSchema(*NavigateResult); } protected: @@ -707,12 +710,12 @@ protected: } void RaiseIssue(const NYql::TIssue& issue) override { - Issues.AddIssue(issue); + Issues->AddIssue(issue); } void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) override { if (!message.empty()) { - Issues.AddIssue(NYql::TIssue(message)); + Issues->AddIssue(NYql::TIssue(message)); } this->Send(ReplyTo, new TEvents::TEvCompleted(0, status)); PassAway(); @@ -725,16 +728,17 @@ protected: private: const TActorId ReplyTo; - const NSchemeCache::TSchemeCacheNavigate& NavigateResult; + std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> NavigateResult; std::shared_ptr<arrow::RecordBatch> Batch; - NYql::TIssues& Issues; + std::shared_ptr<NYql::TIssues> Issues; }; TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId, const TString& dedupId, - const TString& databaseName, const TString& path, const NSchemeCache::TSchemeCacheNavigate& navigateResult, - std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues) + const TString& databaseName, const TString& path, + std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, + std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NYql::TIssues> issues) { return ctx.RegisterWithSameMailbox( new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues)); diff --git a/ydb/core/grpc_services/rpc_long_tx.h b/ydb/core/grpc_services/rpc_long_tx.h index 15edf2be380..2adad8bebbf 100644 --- a/ydb/core/grpc_services/rpc_long_tx.h +++ b/ydb/core/grpc_services/rpc_long_tx.h @@ -7,7 +7,8 @@ namespace NKikimr::NGRpcService { TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId, const TString& dedupId, - const TString& databaseName, const TString& path, const NSchemeCache::TSchemeCacheNavigate& navigateResult, - std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues); + const TString& databaseName, const TString& path, + std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, + std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NYql::TIssues> issues); } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 6971f78e626..07a84594999 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -111,7 +111,7 @@ private: bool Finished; TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult; - TAutoPtr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult; TSerializedCellVec MinKey; TSerializedCellVec MaxKey; TVector<NScheme::TTypeId> KeyColumnTypes; @@ -120,7 +120,7 @@ private: THashSet<TTabletId> ShardRepliesLeft; Ydb::StatusIds::StatusCode Status; TString ErrorMessage; - NYql::TIssues Issues; + std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>(); NLongTxService::TLongTxId LongTxId; NThreading::TFuture<Ydb::LongTx::WriteResponse> WriteBatchResult; @@ -187,7 +187,7 @@ public: protected: const NSchemeCache::TSchemeCacheNavigate* GetResolveNameResult() const { - return ResolveNamesResult.Get(); + return ResolveNamesResult.get(); } const TKeyDesc* GetKeyRange() const { @@ -278,6 +278,7 @@ private: bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) { Y_UNUSED(ctx); + Y_VERIFY(ResolveNamesResult); auto& entry = ResolveNamesResult->ResultSet.front(); @@ -481,7 +482,7 @@ private: Sprintf("Table '%s' is a system view. Bulk upsert is not supported.", GetTable().c_str()), ctx); } - ResolveNamesResult = ev->Get()->Request; + ResolveNamesResult.reset(ev->Get()->Request.Release()); bool makeYdbSchema = isOlapTable || (GetSourceType() != EUploadSource::ProtoValues); TString errorMessage; @@ -628,6 +629,8 @@ private: } std::vector<TString> GetOutputColumns(const NActors::TActorContext& ctx) { + Y_VERIFY(ResolveNamesResult); + if (ResolveNamesResult->ErrorCount > 0) { ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Failed to get table schema", ctx); return {}; @@ -674,11 +677,13 @@ private: } void WriteBatchInLongTx(const TActorContext& ctx) { + Y_VERIFY(ResolveNamesResult); Y_VERIFY(Batch); + TBase::Become(&TThis::StateWaitWriteBatchResult); TString dedupId = LongTxId.ToString(); // TODO: is this a proper dedup_id? NGRpcService::DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId, - GetDatabase(), GetTable(), *ResolveNamesResult, Batch, Issues); + GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues); } void RollbackLongTx(const TActorContext& ctx) { @@ -699,7 +704,8 @@ private: void HandleWriteBatchResult(TEvents::TEvCompleted::TPtr& ev, const TActorContext& ctx) { Ydb::StatusIds::StatusCode status = (Ydb::StatusIds::StatusCode)ev->Get()->Status; if (status != Ydb::StatusIds::SUCCESS) { - for (const auto& issue: Issues) { + Y_VERIFY(Issues); + for (const auto& issue: *Issues) { RaiseIssue(issue); } Finished = true; @@ -768,6 +774,7 @@ private: void ResolveShards(const NActors::TActorContext& ctx) { Y_VERIFY(!GetRows().empty()); + Y_VERIFY(ResolveNamesResult); auto& entry = ResolveNamesResult->ResultSet.front(); |