aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-04-26 11:01:38 +0300
committerArtem Zuikov <chertus@gmail.com>2022-04-26 11:01:38 +0300
commit2efc3455fbb791702d7a066738895ffc561519d8 (patch)
tree8e06473558a7c4b2c0875c0ea423e14810aacf8e
parentc321d56b715aa8ca196cf373c98b02b83251b8bb (diff)
downloadydb-2efc3455fbb791702d7a066738895ffc561519d8.tar.gz
KIKIMR-14790: fix VERIFY in long tx
ref:eaa9e991e3ee16c37ccb03867228055d6c7aca1f
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp34
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.h5
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h19
3 files changed, 35 insertions, 23 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 4da9d39957b..f7b68622edf 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -359,13 +359,13 @@ protected:
LongTxId = longTxId;
}
- void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate* resp) {
- if (resp->ErrorCount > 0) {
+ void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) {
+ if (resp.ErrorCount > 0) {
// TODO: map to a correct error
return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query");
}
- auto& entry = resp->ResultSet[0];
+ auto& entry = resp.ResultSet[0];
if (UserToken && entry.SecurityObject) {
const ui32 access = NACLib::UpdateRow;
@@ -527,8 +527,7 @@ private:
for (auto& issue : issues) {
RaiseIssue(issue);
}
- ReplyError(msg->Record.GetStatus());
- return PassAway();
+ return ReplyError(msg->Record.GetStatus());
}
ReplySuccess();
@@ -622,7 +621,8 @@ public:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get();
- ProceedWithSchema(resp);
+ Y_VERIFY(resp);
+ ProceedWithSchema(*resp);
}
private:
@@ -679,18 +679,21 @@ public:
explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId,
const TString& databaseName, const TString& path,
- const NSchemeCache::TSchemeCacheNavigate& navigateResult,
- std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues)
+ std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult,
+ std::shared_ptr<arrow::RecordBatch> batch,
+ std::shared_ptr<NYql::TIssues> issues)
: TBase(databaseName, path, TString(), longTxId, dedupId)
, ReplyTo(replyTo)
, NavigateResult(navigateResult)
, Batch(batch)
, Issues(issues)
{
+ Y_VERIFY(Issues);
}
void Bootstrap() {
- ProceedWithSchema(&NavigateResult);
+ Y_VERIFY(NavigateResult);
+ ProceedWithSchema(*NavigateResult);
}
protected:
@@ -707,12 +710,12 @@ protected:
}
void RaiseIssue(const NYql::TIssue& issue) override {
- Issues.AddIssue(issue);
+ Issues->AddIssue(issue);
}
void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) override {
if (!message.empty()) {
- Issues.AddIssue(NYql::TIssue(message));
+ Issues->AddIssue(NYql::TIssue(message));
}
this->Send(ReplyTo, new TEvents::TEvCompleted(0, status));
PassAway();
@@ -725,16 +728,17 @@ protected:
private:
const TActorId ReplyTo;
- const NSchemeCache::TSchemeCacheNavigate& NavigateResult;
+ std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> NavigateResult;
std::shared_ptr<arrow::RecordBatch> Batch;
- NYql::TIssues& Issues;
+ std::shared_ptr<NYql::TIssues> Issues;
};
TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo,
const NLongTxService::TLongTxId& longTxId, const TString& dedupId,
- const TString& databaseName, const TString& path, const NSchemeCache::TSchemeCacheNavigate& navigateResult,
- std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues)
+ const TString& databaseName, const TString& path,
+ std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult,
+ std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NYql::TIssues> issues)
{
return ctx.RegisterWithSameMailbox(
new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues));
diff --git a/ydb/core/grpc_services/rpc_long_tx.h b/ydb/core/grpc_services/rpc_long_tx.h
index 15edf2be380..2adad8bebbf 100644
--- a/ydb/core/grpc_services/rpc_long_tx.h
+++ b/ydb/core/grpc_services/rpc_long_tx.h
@@ -7,7 +7,8 @@ namespace NKikimr::NGRpcService {
TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo,
const NLongTxService::TLongTxId& longTxId, const TString& dedupId,
- const TString& databaseName, const TString& path, const NSchemeCache::TSchemeCacheNavigate& navigateResult,
- std::shared_ptr<arrow::RecordBatch> batch, NYql::TIssues& issues);
+ const TString& databaseName, const TString& path,
+ std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult,
+ std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NYql::TIssues> issues);
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 6971f78e626..07a84594999 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -111,7 +111,7 @@ private:
bool Finished;
TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult;
- TAutoPtr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult;
TSerializedCellVec MinKey;
TSerializedCellVec MaxKey;
TVector<NScheme::TTypeId> KeyColumnTypes;
@@ -120,7 +120,7 @@ private:
THashSet<TTabletId> ShardRepliesLeft;
Ydb::StatusIds::StatusCode Status;
TString ErrorMessage;
- NYql::TIssues Issues;
+ std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
NLongTxService::TLongTxId LongTxId;
NThreading::TFuture<Ydb::LongTx::WriteResponse> WriteBatchResult;
@@ -187,7 +187,7 @@ public:
protected:
const NSchemeCache::TSchemeCacheNavigate* GetResolveNameResult() const {
- return ResolveNamesResult.Get();
+ return ResolveNamesResult.get();
}
const TKeyDesc* GetKeyRange() const {
@@ -278,6 +278,7 @@ private:
bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) {
Y_UNUSED(ctx);
+ Y_VERIFY(ResolveNamesResult);
auto& entry = ResolveNamesResult->ResultSet.front();
@@ -481,7 +482,7 @@ private:
Sprintf("Table '%s' is a system view. Bulk upsert is not supported.", GetTable().c_str()), ctx);
}
- ResolveNamesResult = ev->Get()->Request;
+ ResolveNamesResult.reset(ev->Get()->Request.Release());
bool makeYdbSchema = isOlapTable || (GetSourceType() != EUploadSource::ProtoValues);
TString errorMessage;
@@ -628,6 +629,8 @@ private:
}
std::vector<TString> GetOutputColumns(const NActors::TActorContext& ctx) {
+ Y_VERIFY(ResolveNamesResult);
+
if (ResolveNamesResult->ErrorCount > 0) {
ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Failed to get table schema", ctx);
return {};
@@ -674,11 +677,13 @@ private:
}
void WriteBatchInLongTx(const TActorContext& ctx) {
+ Y_VERIFY(ResolveNamesResult);
Y_VERIFY(Batch);
+
TBase::Become(&TThis::StateWaitWriteBatchResult);
TString dedupId = LongTxId.ToString(); // TODO: is this a proper dedup_id?
NGRpcService::DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId,
- GetDatabase(), GetTable(), *ResolveNamesResult, Batch, Issues);
+ GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues);
}
void RollbackLongTx(const TActorContext& ctx) {
@@ -699,7 +704,8 @@ private:
void HandleWriteBatchResult(TEvents::TEvCompleted::TPtr& ev, const TActorContext& ctx) {
Ydb::StatusIds::StatusCode status = (Ydb::StatusIds::StatusCode)ev->Get()->Status;
if (status != Ydb::StatusIds::SUCCESS) {
- for (const auto& issue: Issues) {
+ Y_VERIFY(Issues);
+ for (const auto& issue: *Issues) {
RaiseIssue(issue);
}
Finished = true;
@@ -768,6 +774,7 @@ private:
void ResolveShards(const NActors::TActorContext& ctx) {
Y_VERIFY(!GetRows().empty());
+ Y_VERIFY(ResolveNamesResult);
auto& entry = ResolveNamesResult->ResultSet.front();