diff options
author | gvit <gvit@ydb.tech> | 2023-10-24 22:38:35 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-10-24 22:56:51 +0300 |
commit | 8d5a2acc69409f13d467bf8d6caba6f18f7c7253 (patch) | |
tree | 589291c4ae13e29a3da97dd482504e082e3d476e | |
parent | 0249dbb0588b5dfe3e4d32264d5d05edc11d614c (diff) | |
download | ydb-8d5a2acc69409f13d467bf8d6caba6f18f7c7253.tar.gz |
refactor rpc alter table KIKIMR-18963
-rw-r--r-- | ydb/core/grpc_services/operation_helpers.cpp | 187 | ||||
-rw-r--r-- | ydb/core/grpc_services/operation_helpers.h | 7 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_alter_table.cpp | 79 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_scheme_base.h | 123 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 234 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/local_rpc/helper.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/local_rpc/helper.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 85 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 13 | ||||
-rw-r--r-- | ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 4 | ||||
-rw-r--r-- | ydb/core/ydb_convert/tx_proxy_status.cpp | 75 | ||||
-rw-r--r-- | ydb/core/ydb_convert/tx_proxy_status.h | 13 | ||||
-rw-r--r-- | ydb/core/ydb_convert/ya.make | 1 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 4 |
18 files changed, 466 insertions, 408 deletions
diff --git a/ydb/core/grpc_services/operation_helpers.cpp b/ydb/core/grpc_services/operation_helpers.cpp index 85ee5d5fc3..e8a459d505 100644 --- a/ydb/core/grpc_services/operation_helpers.cpp +++ b/ydb/core/grpc_services/operation_helpers.cpp @@ -22,8 +22,6 @@ namespace NKikimr { namespace NGRpcService { -using std::shared_ptr; - #define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream) #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream) #define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream) @@ -104,191 +102,6 @@ bool TryGetId(const NOperationId::TOperationId& operationId, ui64& id) { return id; } -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class TSSOpSubscriber : public TActorBootstrapped<TSSOpSubscriber> { -public: - TSSOpSubscriber(ui64 schemeshardId, ui64 txId, TString dbName, TOpType opType, shared_ptr<IRequestOpCtx>&& op) - : SchemeshardId(schemeshardId) - , TxId(txId) - , DatabaseName(dbName) - , OpType(opType) - , Req(std::move(op)) - { - StateFunc = &TSSOpSubscriber::DoSubscribe; - } - - void Bootstrap(const TActorContext &ctx) { - SSPipeClient = CreatePipeClient(SchemeshardId, ctx); - - LogPrefix = TStringBuilder() << "[SSOpSubscriber " << SelfId() << "] "; - - (this->*StateFunc)(ctx); - - Become(&TSSOpSubscriber::AwaitState); - } - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr&, const TActorContext&) {} - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext& ctx) { - if (SSPipeClient) { - NTabletPipe::CloseClient(ctx, SSPipeClient); - SSPipeClient = TActorId(); - } - - LOG_E("Handle TEvTabletPipe::TEvClientDestroyed"); - if (AttemptsCounter > 3u) { - TString error = "Too many attempts to create pipe to SS."; - LOG_E(error); - Req->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error)); - Req->ReplyWithYdbStatus(Ydb::StatusIds::OVERLOADED); - Die(ctx); - return; - } - - NTabletPipe::TClientConfig config; - config.RetryPolicy = { - .RetryLimitCount = 5, - .MinRetryTime = TDuration::MilliSeconds(50), - .MaxRetryTime = TDuration::Seconds(10), - .DoFirstRetryInstantly = false - }; - SSPipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, SchemeshardId, config)); - - AttemptsCounter++; - (this->*StateFunc)(ctx); - } - - void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&, const TActorContext&) {} - - void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr&, const TActorContext& ctx) { - //TODO: Change SS API to get operation status just from TEvNotifyTxCompletionResult - switch (OpType) { - case TOpType::Common: - NTabletPipe::CloseClient(ctx, SSPipeClient); - Req->ReplyWithYdbStatus(Ydb::StatusIds::SUCCESS); - Die(ctx); - return; - case TOpType::BuildIndex: - StateFunc = &TSSOpSubscriber::GetBuildIndexStatus; - break; - case TOpType::Export: - StateFunc = &TSSOpSubscriber::GetExportStatus; - break; - case TOpType::Import: - StateFunc = &TSSOpSubscriber::GetImportStatus; - break; - default: - NTabletPipe::CloseClient(ctx, SSPipeClient); - Req->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR); - Die(ctx); - return; - } - (this->*StateFunc)(ctx); - } - - STFUNC(AwaitState) { - switch (ev->GetTypeRewrite()) { - HFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle); - HFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle); - HFunc(TEvTabletPipe::TEvClientConnected, Handle); - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - HFunc(NSchemeShard::TEvExport::TEvGetExportResponse, Handle); - HFunc(NSchemeShard::TEvImport::TEvGetImportResponse, Handle); - HFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle); - default: - { - Req->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR); - PassAway(); - } - } - } - -private: - void PassAway() override { - if (SSPipeClient) { - NTabletPipe::CloseClient(SelfId(), SSPipeClient); - SSPipeClient = TActorId(); - } - IActor::PassAway(); - } - - void DoSubscribe(const TActorContext& ctx) { - auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); - request->Record.SetTxId(TxId); - NTabletPipe::SendData(ctx, SSPipeClient, request.Release()); - } - - void GetBuildIndexStatus(const TActorContext& ctx) { - auto request = new NSchemeShard::TEvIndexBuilder::TEvGetRequest(DatabaseName, TxId); - NTabletPipe::SendData(ctx, SSPipeClient, request); - } - - void GetExportStatus(const TActorContext& ctx) { - auto request = new NSchemeShard::TEvExport::TEvGetExportRequest(DatabaseName, TxId); - NTabletPipe::SendData(ctx, SSPipeClient, request); - } - - void GetImportStatus(const TActorContext& ctx) { - auto request = new NSchemeShard::TEvImport::TEvGetImportRequest(DatabaseName, TxId); - NTabletPipe::SendData(ctx, SSPipeClient, request); - } - - void Handle(NSchemeShard::TEvExport::TEvGetExportResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record.GetResponse(); - - LOG_D("Handle TEvExport::TEvGetExportResponse" - << ": record# " << record.ShortDebugString()); - - auto op = TExportConv::ToOperation(record.GetEntry()); - Req->SendOperation(op); - Die(ctx); - } - - void Handle(NSchemeShard::TEvImport::TEvGetImportResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record.GetResponse(); - - LOG_D("Handle TEvImport::TEvGetImportResponse" - << ": record# " << record.ShortDebugString()); - - auto op = TImportConv::ToOperation(record.GetEntry()); - Req->SendOperation(op); - Die(ctx); - } - - void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - LOG_D("Handle TEvIndexBuilder::TEvGetResponse" - << ": record# " << record.ShortDebugString()); - - if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { - Req->ReplyWithYdbStatus(record.GetStatus()); - } else { - Ydb::Operations::Operation op; - ::NKikimr::NGRpcService::ToOperation(record.GetIndexBuild(), &op); - Req->SendOperation(op); - } - Die(ctx); - } - -private: - const ui64 SchemeshardId; - const ui64 TxId; - const TString DatabaseName; - const TOpType OpType; - shared_ptr<IRequestOpCtx> Req; - using TStateFunc = void (TSSOpSubscriber::*)(const TActorContext& ctx); - - TActorId SSPipeClient; - ui64 AttemptsCounter = 0; - TStateFunc StateFunc; - TString LogPrefix; -}; - -void CreateSSOpSubscriber(ui64 schemeshardId, ui64 txId, const TString& dbName, TOpType opType, shared_ptr<IRequestOpCtx>&& op, const TActorContext& ctx) { - ctx.Register(new TSSOpSubscriber(schemeshardId, txId, dbName, opType, std::move(op))); -} } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/operation_helpers.h b/ydb/core/grpc_services/operation_helpers.h index 845b635a96..a0fcaf40c5 100644 --- a/ydb/core/grpc_services/operation_helpers.h +++ b/ydb/core/grpc_services/operation_helpers.h @@ -24,13 +24,6 @@ Ydb::TOperationId ToOperationId(const NKikimrIndexBuilder::TIndexBuild& build); void ToOperation(const NKikimrIndexBuilder::TIndexBuild& build, Ydb::Operations::Operation* operation); bool TryGetId(const NOperationId::TOperationId& operationId, ui64& id); -enum class TOpType { - Common, - BuildIndex, - Export, - Import -}; -void CreateSSOpSubscriber(ui64 schemeshardId, ui64 txId, const TString& dbName, TOpType opType, std::shared_ptr<IRequestOpCtx>&& op, const TActorContext& ctx); } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index 160d7d959e..b18ade6d3f 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -59,18 +59,9 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>; using EOp = NKikimr::EAlterOperationKind; - void PassAway() override { - if (SSPipeClient) { - NTabletPipe::CloseClient(SelfId(), SSPipeClient); - SSPipeClient = TActorId(); - } - IActor::PassAway(); - } - public: - TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED) + TAlterTableRPC(IRequestOpCtx* msg) : TBase(msg) - , Flags(flags) {} void Bootstrap(const TActorContext &ctx) { @@ -109,7 +100,7 @@ public: return; case EOp::AddIndex: - if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, Flags, code, error)) { + if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, 0, code, error)) { Reply(code, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); return; } @@ -139,6 +130,7 @@ private: HFunc(TEvTxUserProxy::TEvGetProxyServicesResponse, Handle); HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); HFunc(NSchemeShard::TEvIndexBuilder::TEvCreateResponse, Handle); + HFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle); default: TBase::StateWork(ev); } } @@ -200,7 +192,6 @@ private: const auto* msg = ev->Get(); TxId = msg->TxId; - TxProxyMon = msg->TxProxyMon; LogPrefix = TStringBuilder() << "[AlterTableAddIndex " << SelfId() << " TxId# " << TxId << "] "; Navigate(msg->Services.SchemeCache, ctx); @@ -285,14 +276,13 @@ private: return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); } - SchemeshardId = domainInfo->ExtractSchemeShard(); - SSPipeClient = CreatePipeClient(SchemeshardId, ctx); - SendAddIndexOpToSS(ctx); + SendAddIndexOpToSS(ctx, domainInfo->ExtractSchemeShard()); } - void SendAddIndexOpToSS(const TActorContext& ctx) { - auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(IndexBuildSettings)); - NTabletPipe::SendData(ctx, SSPipeClient, ev); + void SendAddIndexOpToSS(const TActorContext& ctx, ui64 schemeShardId) { + SetSchemeShardId(schemeShardId); + auto ev = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, DatabaseName, std::move(IndexBuildSettings)); + ForwardToSchemeShard(ctx, std::move(ev)); } void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev, const TActorContext& ctx) { @@ -315,8 +305,7 @@ private: if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) { Reply(status, issuesProto, ctx); } else if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) { - CreateSSOpSubscriber(SchemeshardId, TxId, DatabaseName, TOpType::BuildIndex, std::move(Request_), ctx); - Die(ctx); + DoSubscribe(ctx); } else { auto op = response.GetIndexBuild(); Ydb::Operations::Operation operation; @@ -329,7 +318,40 @@ private: } } - void AlterTable(const TActorContext &ctx) { + void GetIndexStatus(const TActorContext& ctx) { + auto request = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvGetRequest>(DatabaseName, TxId); + ForwardToSchemeShard(ctx, std::move(request)); + } + + void DoSubscribe(const TActorContext& ctx) { + auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(TxId); + ForwardToSchemeShard(ctx, std::move(request)); + } + + void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override { + if (OpType == EOp::AddIndex) { + GetIndexStatus(ctx); + } else { + TBase::OnNotifyTxCompletionResult(ev, ctx); + } + } + + void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + + TXLOG_D("Handle TEvIndexBuilder::TEvGetResponse: record# " << record.ShortDebugString()); + + if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { + Request_->ReplyWithYdbStatus(record.GetStatus()); + } else { + Ydb::Operations::Operation op; + ::NKikimr::NGRpcService::ToOperation(record.GetIndexBuild(), &op); + Request_->SendOperation(op); + } + Die(ctx); + } + + void AlterTable(const TActorContext &ctx) { const auto req = GetProtoRequest(); std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme(); @@ -344,25 +366,14 @@ private: ctx.Send(MakeTxProxyID(), proposeRequest.release()); } - void ReplyWithStatus(StatusIds::StatusCode status, - const TActorContext &ctx) { - Request_->ReplyWithYdbStatus(status); - Die(ctx); - } - ui64 TxId = 0; - ui64 SchemeshardId = 0; TString DatabaseName; - TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon; TString LogPrefix; - TActorId SSPipeClient; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TPathId ResolvedPathId; TTableProfiles Profiles; EOp OpType; NKikimrIndexBuilder::TIndexBuildSettings IndexBuildSettings; - - const ui64 Flags; }; void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { @@ -374,10 +385,6 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt return new TAlterTableRPC(msg); } -IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags) { - return new TAlterTableRPC(msg, flags); -} - } // namespace NKikimr } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_scheme_base.h b/ydb/core/grpc_services/rpc_scheme_base.h index 68b0eff848..d8d14c2147 100644 --- a/ydb/core/grpc_services/rpc_scheme_base.h +++ b/ydb/core/grpc_services/rpc_scheme_base.h @@ -3,10 +3,10 @@ #include "rpc_deferrable.h" +#include <ydb/core/ydb_convert/tx_proxy_status.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> -namespace NKikimr { -namespace NGRpcService { +namespace NKikimr::NGRpcService { template <typename TDerived, typename TRequest> class TRpcSchemeRequestActor : public TRpcOperationRequestActor<TDerived, TRequest> { @@ -47,9 +47,7 @@ protected: } } - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection to tablet was lost.")); @@ -57,96 +55,61 @@ protected: } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) { - const TEvTxUserProxy::TEvProposeTransactionStatus* msg = ev->Get(); - const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus()); + TEvTxUserProxy::TEvProposeTransactionStatus* msg = ev->Get(); auto issueMessage = msg->Record.GetIssues(); - switch (status) { - case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: { - if (msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess || - msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) - { - return this->ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, ctx); - } - break; - } - case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress: { - ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId(); - IActor* pipeActor = NTabletPipe::CreateClient(ctx.SelfID, schemeShardTabletId); - Y_ABORT_UNLESS(pipeActor); - SchemePipeActorId_ = ctx.ExecutorThread.RegisterActor(pipeActor); - - auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); - request->Record.SetTxId(msg->Record.GetTxId()); - NTabletPipe::SendData(ctx, SchemePipeActorId_, request.Release()); - return; - } - case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest: { - return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, issueMessage, ctx); - } - case TEvTxUserProxy::TResultStatus::AccessDenied: { - return this->ReplyWithResult(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx); - } - case TEvTxUserProxy::TResultStatus::ProxyShardNotAvailable: { - return this->ReplyWithResult(Ydb::StatusIds::UNAVAILABLE, issueMessage, ctx); - } - case TEvTxUserProxy::TResultStatus::ResolveError: { - return this->ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx); - } - case TEvTxUserProxy::TResultStatus::ExecError: { - switch (msg->Record.GetSchemeShardStatus()) { - case NKikimrScheme::EStatus::StatusMultipleModifications: { - return this->ReplyWithResult(Ydb::StatusIds::OVERLOADED, issueMessage, ctx); - } - case NKikimrScheme::EStatus::StatusInvalidParameter: { - return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, issueMessage, ctx); - } - case NKikimrScheme::EStatus::StatusSchemeError: - case NKikimrScheme::EStatus::StatusNameConflict: - - case NKikimrScheme::EStatus::StatusPathDoesNotExist: { - return this->ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx); - } - case NKikimrScheme::EStatus::StatusQuotaExceeded: { - // FIXME: clients may start aggressive retries when receiving 'overloaded' - return this->ReplyWithResult(Ydb::StatusIds::OVERLOADED, issueMessage, ctx); - } - case NKikimrScheme::EStatus::StatusResourceExhausted: - case NKikimrScheme::EStatus::StatusPreconditionFailed: { - return this->ReplyWithResult(Ydb::StatusIds::PRECONDITION_FAILED, issueMessage, ctx); - } - default: { - return this->ReplyWithResult(Ydb::StatusIds::GENERIC_ERROR, issueMessage, ctx); - } - } - } - default: { - TStringStream str; - str << "Got unknown TEvProposeTransactionStatus (" << status << ") response from TxProxy"; - const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str()); - auto tmp = issueMessage.Add(); - NYql::IssueToMessage(issue, tmp); - return this->ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessage, ctx); - } + + Ydb::StatusIds::StatusCode ydbStatus = NKikimr::YdbStatusFromProxyStatus(msg); + if (!NKikimr::IsTxProxyInProgress(ydbStatus)) { + return this->ReplyWithResult(ydbStatus, issueMessage, ctx); } - return this->ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessage, ctx); + + ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId(); + auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(msg->Record.GetTxId()); + SetSchemeShardId(schemeShardTabletId); + ForwardToSchemeShard(ctx, std::move(request)); + return; } void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) { - NTabletPipe::CloseClient(ctx, SchemePipeActorId_); - return this->ReplyNotifyTxCompletionResult(ev, ctx); + return this->OnNotifyTxCompletionResult(ev, ctx); } void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&, const TActorContext&) { } - virtual void ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) { + virtual void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ev); return this->Reply(Ydb::StatusIds::SUCCESS, ctx); } + void PassAway() override { + if (SchemePipeActorId_) { + NTabletPipe::CloseClient(this->SelfId(), SchemePipeActorId_); + } + + TBase::PassAway(); + } + + void SetSchemeShardId(ui64 schemeShardTabletId) { + SchemeShardTabletId = schemeShardTabletId; + } + + template<typename TEv> + void ForwardToSchemeShard(const TActorContext& ctx, std::unique_ptr<TEv>&& ev) { + if (!SchemePipeActorId_) { + Y_ABORT_UNLESS(SchemeShardTabletId); + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = {.RetryLimitCount = 3}; + SchemePipeActorId_ = ctx.ExecutorThread.RegisterActor(NTabletPipe::CreateClient(ctx.SelfID, SchemeShardTabletId)); + } + + Y_ABORT_UNLESS(SchemePipeActorId_); + NTabletPipe::SendData(this->SelfId(), SchemePipeActorId_, ev.release()); + } + private: + ui64 SchemeShardTabletId = 0; TActorId SchemePipeActorId_; }; -} // namespace NGRpcService -} // namespace NKikimr +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 550fbfefe2..b40abaf3a5 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -4,6 +4,7 @@ #include <ydb/core/kqp/gateway/local_rpc/helper.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/kqp/session_actor/kqp_worker_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_build_index.h> namespace NKikimr::NKqp { @@ -15,6 +16,28 @@ using namespace NThreading; namespace { + +static bool CheckAlterAccess(const NACLib::TUserToken& userToken, const NSchemeCache::TSchemeCacheNavigate* navigate) { + bool isDatabase = true; // first entry is always database + + using TEntry = NSchemeCache::TSchemeCacheNavigate::TEntry; + + for (const TEntry& entry : navigate->ResultSet) { + if (!entry.SecurityObject) { + continue; + } + + const ui32 access = isDatabase ? NACLib::CreateDirectory | NACLib::CreateTable : NACLib::GenericRead | NACLib::GenericWrite; + if (!entry.SecurityObject->CheckAccess(access, userToken)) { + return false; + } + + isDatabase = false; + } + + return true; +} + class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> { struct TEvPrivate { enum EEv { @@ -46,6 +69,13 @@ public: ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(txAlloc); } + void StartBuildOperation() { + const auto& schemeOp = PhyTx->GetSchemeOperation(); + auto buildOp = schemeOp.GetBuildOperation(); + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + Become(&TKqpSchemeExecuter::ExecuteState); + } + void Bootstrap() { using TRequest = TEvTxUserProxy::TEvProposeTransaction; @@ -91,26 +121,16 @@ public: } case NKqpProto::TKqpSchemeOperation::kAlterTable: { - auto alter = schemeOp.GetAlterTable(); - TMaybe<TString> token; - TMaybe<TString> type; - - if (UserToken) { - token = UserToken->GetSerializedToken(); - } - - if (auto t = alter.GetType()) { - type = t; - } - - auto cb = GetAlterTableRespHandler(); - DoAlterTableSameMailbox(std::move(alter), std::move(cb), - Database, token, type); - - Become(&TKqpSchemeExecuter::ExecuteState); + auto modifyScheme = schemeOp.GetAlterTable(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); return; } + case NKqpProto::TKqpSchemeOperation::kBuildOperation: { + auto buildOp = schemeOp.GetBuildOperation(); + return StartBuildOperation(); + } + default: InternalError(TStringBuilder() << "Unexpected scheme operation: " << (ui32) schemeOp.GetOperationCase()); @@ -147,6 +167,14 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvPrivate::TEvResult, HandleExecute); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(NSchemeShard::TEvIndexBuilder::TEvCreateResponse, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle); + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle); + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle); default: UnexpectedEvent("ExecuteState", ev->GetTypeRewrite()); } @@ -155,6 +183,163 @@ public: } } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + const auto* msg = ev->Get(); + TxId = msg->TxId; + + Navigate(msg->Services.SchemeCache); + } + + void Navigate(const TActorId& schemeCache) { + const auto& schemeOp = PhyTx->GetSchemeOperation(); + auto buildOp = schemeOp.GetBuildOperation(); + const auto& path = buildOp.source_path(); + + const auto paths = NKikimr::SplitPath(path); + if (paths.empty()) { + TString error = TStringBuilder() << "Failed to split table path " << path; + return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, NYql::TIssue(error)); + } + + auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); + + request->DatabaseName = Database; + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.Path = ::NKikimr::SplitPath(path); + entry.RedirectRequired = false; + + { + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; + entry.Path = paths; + } + + auto ev = std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release()); + Send(schemeCache, ev.release()); + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&) { + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, errors# " << ev->Get()->Request.Get()->ErrorCount); + + NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get(); + + if (resp->ErrorCount > 0 || resp->ResultSet.empty()) { + TStringBuilder builder; + builder << "Unable to navigate:"; + + for (const auto& entry : resp->ResultSet) { + if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + builder << " " << JoinPath(entry.Path) << " status: " << entry.Status; + } + } + + TString error(builder); + LOG_E(error); + return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssue(error)); + } + + if (UserToken && !UserToken->GetSerializedToken().empty() && !CheckAlterAccess(*UserToken, resp)) { + LOG_E("Access check failed"); + return ReplyErrorAndDie(Ydb::StatusIds::UNAUTHORIZED, NYql::TIssue("Unauthorized")); + } + + auto domainInfo = resp->ResultSet.front().DomainInfo; + if (!domainInfo) { + LOG_E("Got empty domain info"); + return ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssue("empty domain info")); + } + + const auto& schemeOp = PhyTx->GetSchemeOperation(); + auto buildOp = schemeOp.GetBuildOperation(); + SetSchemeShardId(domainInfo->ExtractSchemeShard()); + auto req = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, Database, buildOp); + ForwardToSchemeShard(std::move(req)); + } + + void PassAway() override { + if (SchemePipeActorId_) { + NTabletPipe::CloseClient(this->SelfId(), SchemePipeActorId_); + } + + IActor::PassAway(); + } + + void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev) { + const auto& response = ev->Get()->Record; + const auto status = response.GetStatus(); + auto issuesProto = response.GetIssues(); + + LOG_D("Handle TEvIndexBuilder::TEvCreateResponse " << response.ShortUtf8DebugString()); + + if (status == Ydb::StatusIds::SUCCESS) { + if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) { + return ReplyErrorAndDie(status, &issuesProto); + } else { + DoSubscribe(); + } + } else { + ReplyErrorAndDie(status, &issuesProto); + } + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev) { + if (ev->Get()->Status != NKikimrProto::OK) { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + TStringBuilder() << "Tablet not available, status: " << (ui32)ev->Get()->Status)); + return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issues); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + TStringBuilder() << "Connection to tablet was lost.")); + return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issues); + } + + void GetIndexStatus() { + auto request = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvGetRequest>(Database, TxId); + ForwardToSchemeShard(std::move(request)); + } + + void DoSubscribe() { + auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(TxId); + ForwardToSchemeShard(std::move(request)); + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr&) { + GetIndexStatus(); + } + + void SetSchemeShardId(ui64 schemeShardTabletId) { + SchemeShardTabletId = schemeShardTabletId; + } + + void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev) { + auto& record = ev->Get()->Record; + LOG_D("Handle TEvIndexBuilder::TEvGetResponse: record# " << record.ShortDebugString()); + return ReplyErrorAndDie(record.GetStatus(), record.MutableIssues()); + } + + template<typename TEv> + void ForwardToSchemeShard(std::unique_ptr<TEv>&& ev) { + if (!SchemePipeActorId_) { + Y_ABORT_UNLESS(SchemeShardTabletId); + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = {.RetryLimitCount = 3}; + SchemePipeActorId_ = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), SchemeShardTabletId)); + } + + Y_ABORT_UNLESS(SchemePipeActorId_); + NTabletPipe::SendData(SelfId(), SchemePipeActorId_, ev.release()); + } + void HandleExecute(TEvPrivate::TEvResult::TPtr& ev) { auto& response = *ResponseEv->Record.MutableResponse(); @@ -176,18 +361,6 @@ public: private: - std::function<void(const Ydb::Table::AlterTableResponse& r)> GetAlterTableRespHandler() const { - auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem; - auto selfId = SelfId(); - - return [actorSystem, selfId] (const Ydb::Table::AlterTableResponse& r) { - auto ev = MakeHolder<TEvPrivate::TEvResult>(); - - ev->Result = GenericResultFromSyncOperation(r.operation()); - actorSystem->Send(selfId, ev.Release()); - }; - } - void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> protoIssues; IssuesToMessage(issues, &protoIssues); @@ -243,6 +416,9 @@ private: std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv; bool Temporary; TString SessionId; + ui64 TxId = 0; + TActorId SchemePipeActorId_; + ui64 SchemeShardTabletId = 0; }; } // namespace diff --git a/ydb/core/kqp/gateway/local_rpc/helper.cpp b/ydb/core/kqp/gateway/local_rpc/helper.cpp index 86a60a8546..e32d304aa3 100644 --- a/ydb/core/kqp/gateway/local_rpc/helper.cpp +++ b/ydb/core/kqp/gateway/local_rpc/helper.cpp @@ -6,36 +6,6 @@ namespace NKikimr { -namespace NGRpcService { - -IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags); - -class TExtendedAlterTableRequest : public Ydb::Table::AlterTableRequest { -public: - TExtendedAlterTableRequest(NKqpProto::TKqpSchemeOperation::TAlterTable&& alter) - : Ydb::Table::AlterTableRequest(std::move(*alter.MutableReq())) - , ExtendedFlags(alter.GetFlags()) - {} - - ui64 GetExtendedFlags() const { - return ExtendedFlags; - } -private: - const ui64 ExtendedFlags; -}; - -using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<TExtendedAlterTableRequest, - Ydb::Table::AlterTableResponse>; - -struct TEvPgAlterTableRequest : public TEvAlterTableRequest { - static IActor* CreateRpcActor(IRequestOpCtx* ctx) { - auto request = static_cast<const TExtendedAlterTableRequest*>(ctx->GetRequest()); - return CreateExtAlterTableRpcActor(ctx, request->GetExtendedFlags()); - } -}; - -} // NGRpcService - namespace NKqp { using namespace NYql; @@ -57,12 +27,5 @@ IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Operations:: } } -void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb, - const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type) -{ - NRpcService::DoLocalRpcSameMailbox<NGRpcService::TEvPgAlterTableRequest>(std::move(req), std::move(cb), - database, token, type, TlsActivationContext->AsActorContext()); -} - } } diff --git a/ydb/core/kqp/gateway/local_rpc/helper.h b/ydb/core/kqp/gateway/local_rpc/helper.h index 769a3fed64..aaededa245 100644 --- a/ydb/core/kqp/gateway/local_rpc/helper.h +++ b/ydb/core/kqp/gateway/local_rpc/helper.h @@ -2,14 +2,8 @@ #include <ydb/core/kqp/provider/yql_kikimr_gateway.h> -namespace NKikimr { -namespace NKqp { +namespace NKikimr::NKqp { NYql::IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Ydb::Operations::Operation& op); -using TAlterTableRespHandler = std::function<void(const Ydb::Table::AlterTableResponse& r)>; -void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb, - const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type); - -} } diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 12ddae56e0..568715885a 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1,6 +1,7 @@ #include "kqp_host_impl.h" #include <ydb/core/grpc_services/table_settings.h> +#include <ydb/core/ydb_convert/table_description.h> #include <ydb/core/ydb_convert/column_families.h> namespace NKikimr::NKqp { @@ -548,26 +549,76 @@ public: { CHECK_PREPARED_DDL(AlterTable); - if (IsPrepare()) { - auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); - auto& phyTx = *phyQuery.AddTransactions(); - phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); - auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable(); - alter->MutableReq()->Swap(&req); - if (requestType) { - alter->SetType(*requestType); - } - - alter->SetFlags(flags); + if (!IsPrepare()) { + return Gateway->AlterTable(cluster, std::move(req), requestType, flags); + } + auto &phyQuery = + *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto &phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + auto promise = NewPromise<TGenericResult>(); + const auto ops = GetAlterOperationKinds(&req); + if (ops.size() != 1) { + auto code = Ydb::StatusIds::BAD_REQUEST; + auto error = TStringBuilder() << "Unqualified alter table request."; + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + promise.SetValue(errResult); + return promise.GetFuture(); + } - auto promise = NewPromise<TGenericResult>(); + const auto opType = *ops.begin(); + auto tablePromise = NewPromise<TGenericResult>(); + if (opType == EAlterOperationKind::AddIndex) { + auto buildOp = phyTx.MutableSchemeOperation()->MutableBuildOperation(); + Ydb::StatusIds::StatusCode code; + TString error; + if (!BuildAlterTableAddIndexRequest(&req, buildOp, flags, code, error)) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + tablePromise.SetValue(errResult); + return tablePromise.GetFuture(); + } TGenericResult result; result.SetSuccess(); - promise.SetValue(result); - return promise.GetFuture(); - } else { - return Gateway->AlterTable(cluster, std::move(req), requestType, flags); + tablePromise.SetValue(result); + return tablePromise.GetFuture(); } + + auto profilesFuture = Gateway->GetTableProfiles(); + auto sessionCtx = SessionCtx; + profilesFuture.Subscribe( + [tablePromise, sessionCtx, alterReq = std::move(req)]( + const TFuture<IKqpGateway::TKqpTableProfilesResult> &future) mutable { + auto profilesResult = future.GetValue(); + if (!profilesResult.Success()) { + tablePromise.SetValue(ResultFromIssues<TGenericResult>( + profilesResult.Status(), profilesResult.Issues())); + return; + } + + auto &phyQuery = + *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto &phyTx = *phyQuery.AddTransactions(); + auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable(); + const TPathId invalidPathId; + Ydb::StatusIds::StatusCode code; + TString error; + if (!BuildAlterTableModifyScheme(&alterReq, alter, profilesResult.Profiles, invalidPathId, code, error)) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + tablePromise.SetValue(errResult); + return; + } + TGenericResult result; + result.SetSuccess(); + tablePromise.SetValue(result); + }); + + return tablePromise.GetFuture(); } TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override { @@ -599,6 +650,8 @@ public: auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); auto& phyTx = *phyQuery.AddTransactions(); phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + + phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx); phyTx.MutableSchemeOperation()->MutableDropTable()->SetSuccessOnNotExist(settings.SuccessOnNotExist); TGenericResult result; diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 59fd591e11..dac4414eda 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -10,6 +10,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "ydb/public/api/protos/ydb_value.proto"; import "ydb/public/api/protos/ydb_table.proto"; +import "ydb/core/protos/index_builder.proto"; message TKqpPhyExternalBinding { } @@ -377,15 +378,15 @@ message TKqpSchemeOperation { FLAG_PG_MODE = 1; // set if pg compatible mode FLAG_IF_NOT_EXISTS = 2; // set if IF_NOT_EXISTS modificator present }; - message TAlterTable { - Ydb.Table.AlterTableRequest Req = 1; - string Type = 2; - uint64 Flags = 3; - } + oneof Operation { NKikimrSchemeOp.TModifyScheme CreateTable = 1; NKikimrSchemeOp.TModifyScheme DropTable = 2; - TAlterTable AlterTable = 3; + NKikimrSchemeOp.TModifyScheme AlterTable = 3; + NKikimrIndexBuilder.TIndexBuildSettings BuildOperation = 4; + + string Type = 9; + uint64 Flags = 10; } } diff --git a/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt index cb46f19a8e..be87db3480 100644 --- a/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt @@ -34,4 +34,5 @@ target_sources(ydb-core-ydb_convert PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp ) diff --git a/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt b/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt index 64a21db00c..0607942d3c 100644 --- a/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt +++ b/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt @@ -35,4 +35,5 @@ target_sources(ydb-core-ydb_convert PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp ) diff --git a/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt index 64a21db00c..0607942d3c 100644 --- a/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt +++ b/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt @@ -35,4 +35,5 @@ target_sources(ydb-core-ydb_convert PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp ) diff --git a/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt index cb46f19a8e..be87db3480 100644 --- a/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt +++ b/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt @@ -34,4 +34,5 @@ target_sources(ydb-core-ydb_convert PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp ) diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index bd020c8703..dc1f5e343a 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -332,7 +332,9 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki if (OpType == EAlterOperationKind::Attribute) { modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes); - modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId); + if (resolvedPathId) { + modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId); + } auto& alter = *modifyScheme->MutableAlterUserAttributes(); alter.SetPathName(name); diff --git a/ydb/core/ydb_convert/tx_proxy_status.cpp b/ydb/core/ydb_convert/tx_proxy_status.cpp new file mode 100644 index 0000000000..83767aa6f3 --- /dev/null +++ b/ydb/core/ydb_convert/tx_proxy_status.cpp @@ -0,0 +1,75 @@ +#include "tx_proxy_status.h" + +namespace NKikimr { + + +bool IsTxProxyInProgress(const Ydb::StatusIds::StatusCode& code) { + return code == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; +} + +Ydb::StatusIds::StatusCode YdbStatusFromProxyStatus(TEvTxUserProxy::TEvProposeTransactionStatus* msg) +{ + const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus()); + auto issueMessage = msg->Record.GetIssues(); + + switch (status) { + case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: { + if (msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess || msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) { + return Ydb::StatusIds::SUCCESS; + } + break; + } + case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress: { + return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + } + case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest: { + return Ydb::StatusIds::BAD_REQUEST; + } + case TEvTxUserProxy::TResultStatus::AccessDenied: { + return Ydb::StatusIds::UNAUTHORIZED; + } + case TEvTxUserProxy::TResultStatus::ProxyShardNotAvailable: { + return Ydb::StatusIds::UNAVAILABLE; + } + case TEvTxUserProxy::TResultStatus::ResolveError: { + return Ydb::StatusIds::SCHEME_ERROR; + } + case TEvTxUserProxy::TResultStatus::ExecError: { + switch (msg->Record.GetSchemeShardStatus()) { + case NKikimrScheme::EStatus::StatusMultipleModifications: { + return Ydb::StatusIds::OVERLOADED; + } + case NKikimrScheme::EStatus::StatusInvalidParameter: { + return Ydb::StatusIds::BAD_REQUEST; + } + case NKikimrScheme::EStatus::StatusSchemeError: + case NKikimrScheme::EStatus::StatusNameConflict: + case NKikimrScheme::EStatus::StatusPathDoesNotExist: { + return Ydb::StatusIds::SCHEME_ERROR; + } + case NKikimrScheme::EStatus::StatusQuotaExceeded: { + return Ydb::StatusIds::OVERLOADED; + } + case NKikimrScheme::EStatus::StatusResourceExhausted: + case NKikimrScheme::EStatus::StatusPreconditionFailed: { + return Ydb::StatusIds::PRECONDITION_FAILED; + } + default: { + return Ydb::StatusIds::GENERIC_ERROR; + } + } + } + default: { + TStringStream str; + str << "Got unknown TEvProposeTransactionStatus (" << status << ") response from TxProxy"; + const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str()); + auto tmp = issueMessage.Add(); + NYql::IssueToMessage(issue, tmp); + return Ydb::StatusIds::INTERNAL_ERROR; + } + } + + return Ydb::StatusIds::INTERNAL_ERROR; +} + +} // namespace NKikimr
\ No newline at end of file diff --git a/ydb/core/ydb_convert/tx_proxy_status.h b/ydb/core/ydb_convert/tx_proxy_status.h new file mode 100644 index 0000000000..b76e342d90 --- /dev/null +++ b/ydb/core/ydb_convert/tx_proxy_status.h @@ -0,0 +1,13 @@ +#pragma once + +#include <ydb/core/tx/tx_proxy/proxy.h> + +namespace NKikimr { + +// function returns status if we are ready to reply with a specific status to the client +// and Ydb::StatusIds::StatusCode:: otherwise - when operation or transaction is in the progress. +Ydb::StatusIds::StatusCode YdbStatusFromProxyStatus(TEvTxUserProxy::TEvProposeTransactionStatus* msg); + +bool IsTxProxyInProgress(const Ydb::StatusIds::StatusCode& code); + +}
\ No newline at end of file diff --git a/ydb/core/ydb_convert/ya.make b/ydb/core/ydb_convert/ya.make index cf06ad50d3..3893704bbd 100644 --- a/ydb/core/ydb_convert/ya.make +++ b/ydb/core/ydb_convert/ya.make @@ -7,6 +7,7 @@ SRCS( table_description.cpp table_profiles.cpp ydb_convert.cpp + tx_proxy_status.cpp ) PEERDIR( diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 13f755790a..2d08c2bd5b 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1042,7 +1042,7 @@ namespace NKikimr::NDataStreams::V1 { NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo); - void ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override; + void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override; private: TString ConsumerName; @@ -1098,7 +1098,7 @@ namespace NKikimr::NDataStreams::V1 { } } - void TRegisterStreamConsumerActor::ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) { + void TRegisterStreamConsumerActor::OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ev); Ydb::DataStreams::V1::RegisterStreamConsumerResult result; auto consumer = result.Mutableconsumer(); |