aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-10-24 22:38:35 +0300
committergvit <gvit@ydb.tech>2023-10-24 22:56:51 +0300
commit8d5a2acc69409f13d467bf8d6caba6f18f7c7253 (patch)
tree589291c4ae13e29a3da97dd482504e082e3d476e
parent0249dbb0588b5dfe3e4d32264d5d05edc11d614c (diff)
downloadydb-8d5a2acc69409f13d467bf8d6caba6f18f7c7253.tar.gz
refactor rpc alter table KIKIMR-18963
-rw-r--r--ydb/core/grpc_services/operation_helpers.cpp187
-rw-r--r--ydb/core/grpc_services/operation_helpers.h7
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp79
-rw-r--r--ydb/core/grpc_services/rpc_scheme_base.h123
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp234
-rw-r--r--ydb/core/kqp/gateway/local_rpc/helper.cpp37
-rw-r--r--ydb/core/kqp/gateway/local_rpc/helper.h8
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp85
-rw-r--r--ydb/core/protos/kqp_physical.proto13
-rw-r--r--ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/ydb_convert/table_description.cpp4
-rw-r--r--ydb/core/ydb_convert/tx_proxy_status.cpp75
-rw-r--r--ydb/core/ydb_convert/tx_proxy_status.h13
-rw-r--r--ydb/core/ydb_convert/ya.make1
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp4
18 files changed, 466 insertions, 408 deletions
diff --git a/ydb/core/grpc_services/operation_helpers.cpp b/ydb/core/grpc_services/operation_helpers.cpp
index 85ee5d5fc3..e8a459d505 100644
--- a/ydb/core/grpc_services/operation_helpers.cpp
+++ b/ydb/core/grpc_services/operation_helpers.cpp
@@ -22,8 +22,6 @@
namespace NKikimr {
namespace NGRpcService {
-using std::shared_ptr;
-
#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream)
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream)
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_PROXY, LogPrefix << stream)
@@ -104,191 +102,6 @@ bool TryGetId(const NOperationId::TOperationId& operationId, ui64& id) {
return id;
}
-///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-class TSSOpSubscriber : public TActorBootstrapped<TSSOpSubscriber> {
-public:
- TSSOpSubscriber(ui64 schemeshardId, ui64 txId, TString dbName, TOpType opType, shared_ptr<IRequestOpCtx>&& op)
- : SchemeshardId(schemeshardId)
- , TxId(txId)
- , DatabaseName(dbName)
- , OpType(opType)
- , Req(std::move(op))
- {
- StateFunc = &TSSOpSubscriber::DoSubscribe;
- }
-
- void Bootstrap(const TActorContext &ctx) {
- SSPipeClient = CreatePipeClient(SchemeshardId, ctx);
-
- LogPrefix = TStringBuilder() << "[SSOpSubscriber " << SelfId() << "] ";
-
- (this->*StateFunc)(ctx);
-
- Become(&TSSOpSubscriber::AwaitState);
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr&, const TActorContext&) {}
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext& ctx) {
- if (SSPipeClient) {
- NTabletPipe::CloseClient(ctx, SSPipeClient);
- SSPipeClient = TActorId();
- }
-
- LOG_E("Handle TEvTabletPipe::TEvClientDestroyed");
- if (AttemptsCounter > 3u) {
- TString error = "Too many attempts to create pipe to SS.";
- LOG_E(error);
- Req->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error));
- Req->ReplyWithYdbStatus(Ydb::StatusIds::OVERLOADED);
- Die(ctx);
- return;
- }
-
- NTabletPipe::TClientConfig config;
- config.RetryPolicy = {
- .RetryLimitCount = 5,
- .MinRetryTime = TDuration::MilliSeconds(50),
- .MaxRetryTime = TDuration::Seconds(10),
- .DoFirstRetryInstantly = false
- };
- SSPipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, SchemeshardId, config));
-
- AttemptsCounter++;
- (this->*StateFunc)(ctx);
- }
-
- void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&, const TActorContext&) {}
-
- void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr&, const TActorContext& ctx) {
- //TODO: Change SS API to get operation status just from TEvNotifyTxCompletionResult
- switch (OpType) {
- case TOpType::Common:
- NTabletPipe::CloseClient(ctx, SSPipeClient);
- Req->ReplyWithYdbStatus(Ydb::StatusIds::SUCCESS);
- Die(ctx);
- return;
- case TOpType::BuildIndex:
- StateFunc = &TSSOpSubscriber::GetBuildIndexStatus;
- break;
- case TOpType::Export:
- StateFunc = &TSSOpSubscriber::GetExportStatus;
- break;
- case TOpType::Import:
- StateFunc = &TSSOpSubscriber::GetImportStatus;
- break;
- default:
- NTabletPipe::CloseClient(ctx, SSPipeClient);
- Req->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
- Die(ctx);
- return;
- }
- (this->*StateFunc)(ctx);
- }
-
- STFUNC(AwaitState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
- HFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle);
- HFunc(TEvTabletPipe::TEvClientConnected, Handle);
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- HFunc(NSchemeShard::TEvExport::TEvGetExportResponse, Handle);
- HFunc(NSchemeShard::TEvImport::TEvGetImportResponse, Handle);
- HFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle);
- default:
- {
- Req->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
- PassAway();
- }
- }
- }
-
-private:
- void PassAway() override {
- if (SSPipeClient) {
- NTabletPipe::CloseClient(SelfId(), SSPipeClient);
- SSPipeClient = TActorId();
- }
- IActor::PassAway();
- }
-
- void DoSubscribe(const TActorContext& ctx) {
- auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
- request->Record.SetTxId(TxId);
- NTabletPipe::SendData(ctx, SSPipeClient, request.Release());
- }
-
- void GetBuildIndexStatus(const TActorContext& ctx) {
- auto request = new NSchemeShard::TEvIndexBuilder::TEvGetRequest(DatabaseName, TxId);
- NTabletPipe::SendData(ctx, SSPipeClient, request);
- }
-
- void GetExportStatus(const TActorContext& ctx) {
- auto request = new NSchemeShard::TEvExport::TEvGetExportRequest(DatabaseName, TxId);
- NTabletPipe::SendData(ctx, SSPipeClient, request);
- }
-
- void GetImportStatus(const TActorContext& ctx) {
- auto request = new NSchemeShard::TEvImport::TEvGetImportRequest(DatabaseName, TxId);
- NTabletPipe::SendData(ctx, SSPipeClient, request);
- }
-
- void Handle(NSchemeShard::TEvExport::TEvGetExportResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record.GetResponse();
-
- LOG_D("Handle TEvExport::TEvGetExportResponse"
- << ": record# " << record.ShortDebugString());
-
- auto op = TExportConv::ToOperation(record.GetEntry());
- Req->SendOperation(op);
- Die(ctx);
- }
-
- void Handle(NSchemeShard::TEvImport::TEvGetImportResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record.GetResponse();
-
- LOG_D("Handle TEvImport::TEvGetImportResponse"
- << ": record# " << record.ShortDebugString());
-
- auto op = TImportConv::ToOperation(record.GetEntry());
- Req->SendOperation(op);
- Die(ctx);
- }
-
- void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
-
- LOG_D("Handle TEvIndexBuilder::TEvGetResponse"
- << ": record# " << record.ShortDebugString());
-
- if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
- Req->ReplyWithYdbStatus(record.GetStatus());
- } else {
- Ydb::Operations::Operation op;
- ::NKikimr::NGRpcService::ToOperation(record.GetIndexBuild(), &op);
- Req->SendOperation(op);
- }
- Die(ctx);
- }
-
-private:
- const ui64 SchemeshardId;
- const ui64 TxId;
- const TString DatabaseName;
- const TOpType OpType;
- shared_ptr<IRequestOpCtx> Req;
- using TStateFunc = void (TSSOpSubscriber::*)(const TActorContext& ctx);
-
- TActorId SSPipeClient;
- ui64 AttemptsCounter = 0;
- TStateFunc StateFunc;
- TString LogPrefix;
-};
-
-void CreateSSOpSubscriber(ui64 schemeshardId, ui64 txId, const TString& dbName, TOpType opType, shared_ptr<IRequestOpCtx>&& op, const TActorContext& ctx) {
- ctx.Register(new TSSOpSubscriber(schemeshardId, txId, dbName, opType, std::move(op)));
-}
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/operation_helpers.h b/ydb/core/grpc_services/operation_helpers.h
index 845b635a96..a0fcaf40c5 100644
--- a/ydb/core/grpc_services/operation_helpers.h
+++ b/ydb/core/grpc_services/operation_helpers.h
@@ -24,13 +24,6 @@ Ydb::TOperationId ToOperationId(const NKikimrIndexBuilder::TIndexBuild& build);
void ToOperation(const NKikimrIndexBuilder::TIndexBuild& build, Ydb::Operations::Operation* operation);
bool TryGetId(const NOperationId::TOperationId& operationId, ui64& id);
-enum class TOpType {
- Common,
- BuildIndex,
- Export,
- Import
-};
-void CreateSSOpSubscriber(ui64 schemeshardId, ui64 txId, const TString& dbName, TOpType opType, std::shared_ptr<IRequestOpCtx>&& op, const TActorContext& ctx);
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index 160d7d959e..b18ade6d3f 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -59,18 +59,9 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>;
using EOp = NKikimr::EAlterOperationKind;
- void PassAway() override {
- if (SSPipeClient) {
- NTabletPipe::CloseClient(SelfId(), SSPipeClient);
- SSPipeClient = TActorId();
- }
- IActor::PassAway();
- }
-
public:
- TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED)
+ TAlterTableRPC(IRequestOpCtx* msg)
: TBase(msg)
- , Flags(flags)
{}
void Bootstrap(const TActorContext &ctx) {
@@ -109,7 +100,7 @@ public:
return;
case EOp::AddIndex:
- if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, Flags, code, error)) {
+ if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, 0, code, error)) {
Reply(code, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
return;
}
@@ -139,6 +130,7 @@ private:
HFunc(TEvTxUserProxy::TEvGetProxyServicesResponse, Handle);
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
HFunc(NSchemeShard::TEvIndexBuilder::TEvCreateResponse, Handle);
+ HFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle);
default: TBase::StateWork(ev);
}
}
@@ -200,7 +192,6 @@ private:
const auto* msg = ev->Get();
TxId = msg->TxId;
- TxProxyMon = msg->TxProxyMon;
LogPrefix = TStringBuilder() << "[AlterTableAddIndex " << SelfId() << " TxId# " << TxId << "] ";
Navigate(msg->Services.SchemeCache, ctx);
@@ -285,14 +276,13 @@ private:
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}
- SchemeshardId = domainInfo->ExtractSchemeShard();
- SSPipeClient = CreatePipeClient(SchemeshardId, ctx);
- SendAddIndexOpToSS(ctx);
+ SendAddIndexOpToSS(ctx, domainInfo->ExtractSchemeShard());
}
- void SendAddIndexOpToSS(const TActorContext& ctx) {
- auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(IndexBuildSettings));
- NTabletPipe::SendData(ctx, SSPipeClient, ev);
+ void SendAddIndexOpToSS(const TActorContext& ctx, ui64 schemeShardId) {
+ SetSchemeShardId(schemeShardId);
+ auto ev = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, DatabaseName, std::move(IndexBuildSettings));
+ ForwardToSchemeShard(ctx, std::move(ev));
}
void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev, const TActorContext& ctx) {
@@ -315,8 +305,7 @@ private:
if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) {
Reply(status, issuesProto, ctx);
} else if (GetOperationMode() == Ydb::Operations::OperationParams::SYNC) {
- CreateSSOpSubscriber(SchemeshardId, TxId, DatabaseName, TOpType::BuildIndex, std::move(Request_), ctx);
- Die(ctx);
+ DoSubscribe(ctx);
} else {
auto op = response.GetIndexBuild();
Ydb::Operations::Operation operation;
@@ -329,7 +318,40 @@ private:
}
}
- void AlterTable(const TActorContext &ctx) {
+ void GetIndexStatus(const TActorContext& ctx) {
+ auto request = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvGetRequest>(DatabaseName, TxId);
+ ForwardToSchemeShard(ctx, std::move(request));
+ }
+
+ void DoSubscribe(const TActorContext& ctx) {
+ auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(TxId);
+ ForwardToSchemeShard(ctx, std::move(request));
+ }
+
+ void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override {
+ if (OpType == EOp::AddIndex) {
+ GetIndexStatus(ctx);
+ } else {
+ TBase::OnNotifyTxCompletionResult(ev, ctx);
+ }
+ }
+
+ void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+
+ TXLOG_D("Handle TEvIndexBuilder::TEvGetResponse: record# " << record.ShortDebugString());
+
+ if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
+ Request_->ReplyWithYdbStatus(record.GetStatus());
+ } else {
+ Ydb::Operations::Operation op;
+ ::NKikimr::NGRpcService::ToOperation(record.GetIndexBuild(), &op);
+ Request_->SendOperation(op);
+ }
+ Die(ctx);
+ }
+
+ void AlterTable(const TActorContext &ctx) {
const auto req = GetProtoRequest();
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
@@ -344,25 +366,14 @@ private:
ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
- void ReplyWithStatus(StatusIds::StatusCode status,
- const TActorContext &ctx) {
- Request_->ReplyWithYdbStatus(status);
- Die(ctx);
- }
-
ui64 TxId = 0;
- ui64 SchemeshardId = 0;
TString DatabaseName;
- TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
TString LogPrefix;
- TActorId SSPipeClient;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TPathId ResolvedPathId;
TTableProfiles Profiles;
EOp OpType;
NKikimrIndexBuilder::TIndexBuildSettings IndexBuildSettings;
-
- const ui64 Flags;
};
void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
@@ -374,10 +385,6 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
return new TAlterTableRPC(msg);
}
-IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags) {
- return new TAlterTableRPC(msg, flags);
-}
-
} // namespace NKikimr
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_scheme_base.h b/ydb/core/grpc_services/rpc_scheme_base.h
index 68b0eff848..d8d14c2147 100644
--- a/ydb/core/grpc_services/rpc_scheme_base.h
+++ b/ydb/core/grpc_services/rpc_scheme_base.h
@@ -3,10 +3,10 @@
#include "rpc_deferrable.h"
+#include <ydb/core/ydb_convert/tx_proxy_status.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
-namespace NKikimr {
-namespace NGRpcService {
+namespace NKikimr::NGRpcService {
template <typename TDerived, typename TRequest>
class TRpcSchemeRequestActor : public TRpcOperationRequestActor<TDerived, TRequest> {
@@ -47,9 +47,7 @@ protected:
}
}
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
-
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
TStringBuilder() << "Connection to tablet was lost."));
@@ -57,96 +55,61 @@ protected:
}
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
- const TEvTxUserProxy::TEvProposeTransactionStatus* msg = ev->Get();
- const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
+ TEvTxUserProxy::TEvProposeTransactionStatus* msg = ev->Get();
auto issueMessage = msg->Record.GetIssues();
- switch (status) {
- case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: {
- if (msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess ||
- msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists)
- {
- return this->ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, ctx);
- }
- break;
- }
- case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress: {
- ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId();
- IActor* pipeActor = NTabletPipe::CreateClient(ctx.SelfID, schemeShardTabletId);
- Y_ABORT_UNLESS(pipeActor);
- SchemePipeActorId_ = ctx.ExecutorThread.RegisterActor(pipeActor);
-
- auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
- request->Record.SetTxId(msg->Record.GetTxId());
- NTabletPipe::SendData(ctx, SchemePipeActorId_, request.Release());
- return;
- }
- case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest: {
- return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, issueMessage, ctx);
- }
- case TEvTxUserProxy::TResultStatus::AccessDenied: {
- return this->ReplyWithResult(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx);
- }
- case TEvTxUserProxy::TResultStatus::ProxyShardNotAvailable: {
- return this->ReplyWithResult(Ydb::StatusIds::UNAVAILABLE, issueMessage, ctx);
- }
- case TEvTxUserProxy::TResultStatus::ResolveError: {
- return this->ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx);
- }
- case TEvTxUserProxy::TResultStatus::ExecError: {
- switch (msg->Record.GetSchemeShardStatus()) {
- case NKikimrScheme::EStatus::StatusMultipleModifications: {
- return this->ReplyWithResult(Ydb::StatusIds::OVERLOADED, issueMessage, ctx);
- }
- case NKikimrScheme::EStatus::StatusInvalidParameter: {
- return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, issueMessage, ctx);
- }
- case NKikimrScheme::EStatus::StatusSchemeError:
- case NKikimrScheme::EStatus::StatusNameConflict:
-
- case NKikimrScheme::EStatus::StatusPathDoesNotExist: {
- return this->ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx);
- }
- case NKikimrScheme::EStatus::StatusQuotaExceeded: {
- // FIXME: clients may start aggressive retries when receiving 'overloaded'
- return this->ReplyWithResult(Ydb::StatusIds::OVERLOADED, issueMessage, ctx);
- }
- case NKikimrScheme::EStatus::StatusResourceExhausted:
- case NKikimrScheme::EStatus::StatusPreconditionFailed: {
- return this->ReplyWithResult(Ydb::StatusIds::PRECONDITION_FAILED, issueMessage, ctx);
- }
- default: {
- return this->ReplyWithResult(Ydb::StatusIds::GENERIC_ERROR, issueMessage, ctx);
- }
- }
- }
- default: {
- TStringStream str;
- str << "Got unknown TEvProposeTransactionStatus (" << status << ") response from TxProxy";
- const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str());
- auto tmp = issueMessage.Add();
- NYql::IssueToMessage(issue, tmp);
- return this->ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessage, ctx);
- }
+
+ Ydb::StatusIds::StatusCode ydbStatus = NKikimr::YdbStatusFromProxyStatus(msg);
+ if (!NKikimr::IsTxProxyInProgress(ydbStatus)) {
+ return this->ReplyWithResult(ydbStatus, issueMessage, ctx);
}
- return this->ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, issueMessage, ctx);
+
+ ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId();
+ auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(msg->Record.GetTxId());
+ SetSchemeShardId(schemeShardTabletId);
+ ForwardToSchemeShard(ctx, std::move(request));
+ return;
}
void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
- NTabletPipe::CloseClient(ctx, SchemePipeActorId_);
- return this->ReplyNotifyTxCompletionResult(ev, ctx);
+ return this->OnNotifyTxCompletionResult(ev, ctx);
}
void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&, const TActorContext&) {
}
- virtual void ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
+ virtual void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
return this->Reply(Ydb::StatusIds::SUCCESS, ctx);
}
+ void PassAway() override {
+ if (SchemePipeActorId_) {
+ NTabletPipe::CloseClient(this->SelfId(), SchemePipeActorId_);
+ }
+
+ TBase::PassAway();
+ }
+
+ void SetSchemeShardId(ui64 schemeShardTabletId) {
+ SchemeShardTabletId = schemeShardTabletId;
+ }
+
+ template<typename TEv>
+ void ForwardToSchemeShard(const TActorContext& ctx, std::unique_ptr<TEv>&& ev) {
+ if (!SchemePipeActorId_) {
+ Y_ABORT_UNLESS(SchemeShardTabletId);
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.RetryPolicy = {.RetryLimitCount = 3};
+ SchemePipeActorId_ = ctx.ExecutorThread.RegisterActor(NTabletPipe::CreateClient(ctx.SelfID, SchemeShardTabletId));
+ }
+
+ Y_ABORT_UNLESS(SchemePipeActorId_);
+ NTabletPipe::SendData(this->SelfId(), SchemePipeActorId_, ev.release());
+ }
+
private:
+ ui64 SchemeShardTabletId = 0;
TActorId SchemePipeActorId_;
};
-} // namespace NGRpcService
-} // namespace NKikimr
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 550fbfefe2..b40abaf3a5 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/gateway/local_rpc/helper.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
namespace NKikimr::NKqp {
@@ -15,6 +16,28 @@ using namespace NThreading;
namespace {
+
+static bool CheckAlterAccess(const NACLib::TUserToken& userToken, const NSchemeCache::TSchemeCacheNavigate* navigate) {
+ bool isDatabase = true; // first entry is always database
+
+ using TEntry = NSchemeCache::TSchemeCacheNavigate::TEntry;
+
+ for (const TEntry& entry : navigate->ResultSet) {
+ if (!entry.SecurityObject) {
+ continue;
+ }
+
+ const ui32 access = isDatabase ? NACLib::CreateDirectory | NACLib::CreateTable : NACLib::GenericRead | NACLib::GenericWrite;
+ if (!entry.SecurityObject->CheckAccess(access, userToken)) {
+ return false;
+ }
+
+ isDatabase = false;
+ }
+
+ return true;
+}
+
class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
struct TEvPrivate {
enum EEv {
@@ -46,6 +69,13 @@ public:
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(txAlloc);
}
+ void StartBuildOperation() {
+ const auto& schemeOp = PhyTx->GetSchemeOperation();
+ auto buildOp = schemeOp.GetBuildOperation();
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ Become(&TKqpSchemeExecuter::ExecuteState);
+ }
+
void Bootstrap() {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
@@ -91,26 +121,16 @@ public:
}
case NKqpProto::TKqpSchemeOperation::kAlterTable: {
- auto alter = schemeOp.GetAlterTable();
- TMaybe<TString> token;
- TMaybe<TString> type;
-
- if (UserToken) {
- token = UserToken->GetSerializedToken();
- }
-
- if (auto t = alter.GetType()) {
- type = t;
- }
-
- auto cb = GetAlterTableRespHandler();
- DoAlterTableSameMailbox(std::move(alter), std::move(cb),
- Database, token, type);
-
- Become(&TKqpSchemeExecuter::ExecuteState);
+ auto modifyScheme = schemeOp.GetAlterTable();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
return;
}
+ case NKqpProto::TKqpSchemeOperation::kBuildOperation: {
+ auto buildOp = schemeOp.GetBuildOperation();
+ return StartBuildOperation();
+ }
+
default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
@@ -147,6 +167,14 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvResult, HandleExecute);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(NSchemeShard::TEvIndexBuilder::TEvCreateResponse, Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
default:
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());
}
@@ -155,6 +183,163 @@ public:
}
}
+
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ const auto* msg = ev->Get();
+ TxId = msg->TxId;
+
+ Navigate(msg->Services.SchemeCache);
+ }
+
+ void Navigate(const TActorId& schemeCache) {
+ const auto& schemeOp = PhyTx->GetSchemeOperation();
+ auto buildOp = schemeOp.GetBuildOperation();
+ const auto& path = buildOp.source_path();
+
+ const auto paths = NKikimr::SplitPath(path);
+ if (paths.empty()) {
+ TString error = TStringBuilder() << "Failed to split table path " << path;
+ return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, NYql::TIssue(error));
+ }
+
+ auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+
+ request->DatabaseName = Database;
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.Path = ::NKikimr::SplitPath(path);
+ entry.RedirectRequired = false;
+
+ {
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
+ entry.Path = paths;
+ }
+
+ auto ev = std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release());
+ Send(schemeCache, ev.release());
+ }
+
+ void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&) {
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, errors# " << ev->Get()->Request.Get()->ErrorCount);
+
+ NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get();
+
+ if (resp->ErrorCount > 0 || resp->ResultSet.empty()) {
+ TStringBuilder builder;
+ builder << "Unable to navigate:";
+
+ for (const auto& entry : resp->ResultSet) {
+ if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ builder << " " << JoinPath(entry.Path) << " status: " << entry.Status;
+ }
+ }
+
+ TString error(builder);
+ LOG_E(error);
+ return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssue(error));
+ }
+
+ if (UserToken && !UserToken->GetSerializedToken().empty() && !CheckAlterAccess(*UserToken, resp)) {
+ LOG_E("Access check failed");
+ return ReplyErrorAndDie(Ydb::StatusIds::UNAUTHORIZED, NYql::TIssue("Unauthorized"));
+ }
+
+ auto domainInfo = resp->ResultSet.front().DomainInfo;
+ if (!domainInfo) {
+ LOG_E("Got empty domain info");
+ return ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssue("empty domain info"));
+ }
+
+ const auto& schemeOp = PhyTx->GetSchemeOperation();
+ auto buildOp = schemeOp.GetBuildOperation();
+ SetSchemeShardId(domainInfo->ExtractSchemeShard());
+ auto req = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, Database, buildOp);
+ ForwardToSchemeShard(std::move(req));
+ }
+
+ void PassAway() override {
+ if (SchemePipeActorId_) {
+ NTabletPipe::CloseClient(this->SelfId(), SchemePipeActorId_);
+ }
+
+ IActor::PassAway();
+ }
+
+ void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev) {
+ const auto& response = ev->Get()->Record;
+ const auto status = response.GetStatus();
+ auto issuesProto = response.GetIssues();
+
+ LOG_D("Handle TEvIndexBuilder::TEvCreateResponse " << response.ShortUtf8DebugString());
+
+ if (status == Ydb::StatusIds::SUCCESS) {
+ if (response.HasSchemeStatus() && response.GetSchemeStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) {
+ return ReplyErrorAndDie(status, &issuesProto);
+ } else {
+ DoSubscribe();
+ }
+ } else {
+ ReplyErrorAndDie(status, &issuesProto);
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ TStringBuilder() << "Tablet not available, status: " << (ui32)ev->Get()->Status));
+ return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issues);
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ TStringBuilder() << "Connection to tablet was lost."));
+ return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issues);
+ }
+
+ void GetIndexStatus() {
+ auto request = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvGetRequest>(Database, TxId);
+ ForwardToSchemeShard(std::move(request));
+ }
+
+ void DoSubscribe() {
+ auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(TxId);
+ ForwardToSchemeShard(std::move(request));
+ }
+
+ void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr&) {
+ GetIndexStatus();
+ }
+
+ void SetSchemeShardId(ui64 schemeShardTabletId) {
+ SchemeShardTabletId = schemeShardTabletId;
+ }
+
+ void Handle(NSchemeShard::TEvIndexBuilder::TEvGetResponse::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ LOG_D("Handle TEvIndexBuilder::TEvGetResponse: record# " << record.ShortDebugString());
+ return ReplyErrorAndDie(record.GetStatus(), record.MutableIssues());
+ }
+
+ template<typename TEv>
+ void ForwardToSchemeShard(std::unique_ptr<TEv>&& ev) {
+ if (!SchemePipeActorId_) {
+ Y_ABORT_UNLESS(SchemeShardTabletId);
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.RetryPolicy = {.RetryLimitCount = 3};
+ SchemePipeActorId_ = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), SchemeShardTabletId));
+ }
+
+ Y_ABORT_UNLESS(SchemePipeActorId_);
+ NTabletPipe::SendData(SelfId(), SchemePipeActorId_, ev.release());
+ }
+
void HandleExecute(TEvPrivate::TEvResult::TPtr& ev) {
auto& response = *ResponseEv->Record.MutableResponse();
@@ -176,18 +361,6 @@ public:
private:
- std::function<void(const Ydb::Table::AlterTableResponse& r)> GetAlterTableRespHandler() const {
- auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem;
- auto selfId = SelfId();
-
- return [actorSystem, selfId] (const Ydb::Table::AlterTableResponse& r) {
- auto ev = MakeHolder<TEvPrivate::TEvResult>();
-
- ev->Result = GenericResultFromSyncOperation(r.operation());
- actorSystem->Send(selfId, ev.Release());
- };
- }
-
void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> protoIssues;
IssuesToMessage(issues, &protoIssues);
@@ -243,6 +416,9 @@ private:
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
bool Temporary;
TString SessionId;
+ ui64 TxId = 0;
+ TActorId SchemePipeActorId_;
+ ui64 SchemeShardTabletId = 0;
};
} // namespace
diff --git a/ydb/core/kqp/gateway/local_rpc/helper.cpp b/ydb/core/kqp/gateway/local_rpc/helper.cpp
index 86a60a8546..e32d304aa3 100644
--- a/ydb/core/kqp/gateway/local_rpc/helper.cpp
+++ b/ydb/core/kqp/gateway/local_rpc/helper.cpp
@@ -6,36 +6,6 @@
namespace NKikimr {
-namespace NGRpcService {
-
-IActor* CreateExtAlterTableRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg, ui64 flags);
-
-class TExtendedAlterTableRequest : public Ydb::Table::AlterTableRequest {
-public:
- TExtendedAlterTableRequest(NKqpProto::TKqpSchemeOperation::TAlterTable&& alter)
- : Ydb::Table::AlterTableRequest(std::move(*alter.MutableReq()))
- , ExtendedFlags(alter.GetFlags())
- {}
-
- ui64 GetExtendedFlags() const {
- return ExtendedFlags;
- }
-private:
- const ui64 ExtendedFlags;
-};
-
-using TEvAlterTableRequest = NGRpcService::TGrpcRequestOperationCall<TExtendedAlterTableRequest,
- Ydb::Table::AlterTableResponse>;
-
-struct TEvPgAlterTableRequest : public TEvAlterTableRequest {
- static IActor* CreateRpcActor(IRequestOpCtx* ctx) {
- auto request = static_cast<const TExtendedAlterTableRequest*>(ctx->GetRequest());
- return CreateExtAlterTableRpcActor(ctx, request->GetExtendedFlags());
- }
-};
-
-} // NGRpcService
-
namespace NKqp {
using namespace NYql;
@@ -57,12 +27,5 @@ IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Operations::
}
}
-void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb,
- const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type)
-{
- NRpcService::DoLocalRpcSameMailbox<NGRpcService::TEvPgAlterTableRequest>(std::move(req), std::move(cb),
- database, token, type, TlsActivationContext->AsActorContext());
-}
-
}
}
diff --git a/ydb/core/kqp/gateway/local_rpc/helper.h b/ydb/core/kqp/gateway/local_rpc/helper.h
index 769a3fed64..aaededa245 100644
--- a/ydb/core/kqp/gateway/local_rpc/helper.h
+++ b/ydb/core/kqp/gateway/local_rpc/helper.h
@@ -2,14 +2,8 @@
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
-namespace NKikimr {
-namespace NKqp {
+namespace NKikimr::NKqp {
NYql::IKikimrGateway::TGenericResult GenericResultFromSyncOperation(const Ydb::Operations::Operation& op);
-using TAlterTableRespHandler = std::function<void(const Ydb::Table::AlterTableResponse& r)>;
-void DoAlterTableSameMailbox(NKqpProto::TKqpSchemeOperation::TAlterTable&& req, TAlterTableRespHandler&& cb,
- const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& type);
-
-}
}
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 12ddae56e0..568715885a 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -1,6 +1,7 @@
#include "kqp_host_impl.h"
#include <ydb/core/grpc_services/table_settings.h>
+#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/column_families.h>
namespace NKikimr::NKqp {
@@ -548,26 +549,76 @@ public:
{
CHECK_PREPARED_DDL(AlterTable);
- if (IsPrepare()) {
- auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
- auto& phyTx = *phyQuery.AddTransactions();
- phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
- auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable();
- alter->MutableReq()->Swap(&req);
- if (requestType) {
- alter->SetType(*requestType);
- }
-
- alter->SetFlags(flags);
+ if (!IsPrepare()) {
+ return Gateway->AlterTable(cluster, std::move(req), requestType, flags);
+ }
+ auto &phyQuery =
+ *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto &phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ auto promise = NewPromise<TGenericResult>();
+ const auto ops = GetAlterOperationKinds(&req);
+ if (ops.size() != 1) {
+ auto code = Ydb::StatusIds::BAD_REQUEST;
+ auto error = TStringBuilder() << "Unqualified alter table request.";
+ IKqpGateway::TGenericResult errResult;
+ errResult.AddIssue(NYql::TIssue(error));
+ errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+ promise.SetValue(errResult);
+ return promise.GetFuture();
+ }
- auto promise = NewPromise<TGenericResult>();
+ const auto opType = *ops.begin();
+ auto tablePromise = NewPromise<TGenericResult>();
+ if (opType == EAlterOperationKind::AddIndex) {
+ auto buildOp = phyTx.MutableSchemeOperation()->MutableBuildOperation();
+ Ydb::StatusIds::StatusCode code;
+ TString error;
+ if (!BuildAlterTableAddIndexRequest(&req, buildOp, flags, code, error)) {
+ IKqpGateway::TGenericResult errResult;
+ errResult.AddIssue(NYql::TIssue(error));
+ errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+ tablePromise.SetValue(errResult);
+ return tablePromise.GetFuture();
+ }
TGenericResult result;
result.SetSuccess();
- promise.SetValue(result);
- return promise.GetFuture();
- } else {
- return Gateway->AlterTable(cluster, std::move(req), requestType, flags);
+ tablePromise.SetValue(result);
+ return tablePromise.GetFuture();
}
+
+ auto profilesFuture = Gateway->GetTableProfiles();
+ auto sessionCtx = SessionCtx;
+ profilesFuture.Subscribe(
+ [tablePromise, sessionCtx, alterReq = std::move(req)](
+ const TFuture<IKqpGateway::TKqpTableProfilesResult> &future) mutable {
+ auto profilesResult = future.GetValue();
+ if (!profilesResult.Success()) {
+ tablePromise.SetValue(ResultFromIssues<TGenericResult>(
+ profilesResult.Status(), profilesResult.Issues()));
+ return;
+ }
+
+ auto &phyQuery =
+ *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto &phyTx = *phyQuery.AddTransactions();
+ auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable();
+ const TPathId invalidPathId;
+ Ydb::StatusIds::StatusCode code;
+ TString error;
+ if (!BuildAlterTableModifyScheme(&alterReq, alter, profilesResult.Profiles, invalidPathId, code, error)) {
+ IKqpGateway::TGenericResult errResult;
+ errResult.AddIssue(NYql::TIssue(error));
+ errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+ tablePromise.SetValue(errResult);
+ return;
+ }
+ TGenericResult result;
+ result.SetSuccess();
+ tablePromise.SetValue(result);
+ });
+
+ return tablePromise.GetFuture();
}
TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override {
@@ -599,6 +650,8 @@ public:
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+
+
phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx);
phyTx.MutableSchemeOperation()->MutableDropTable()->SetSuccessOnNotExist(settings.SuccessOnNotExist);
TGenericResult result;
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 59fd591e11..dac4414eda 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -10,6 +10,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
import "ydb/public/api/protos/ydb_value.proto";
import "ydb/public/api/protos/ydb_table.proto";
+import "ydb/core/protos/index_builder.proto";
message TKqpPhyExternalBinding {
}
@@ -377,15 +378,15 @@ message TKqpSchemeOperation {
FLAG_PG_MODE = 1; // set if pg compatible mode
FLAG_IF_NOT_EXISTS = 2; // set if IF_NOT_EXISTS modificator present
};
- message TAlterTable {
- Ydb.Table.AlterTableRequest Req = 1;
- string Type = 2;
- uint64 Flags = 3;
- }
+
oneof Operation {
NKikimrSchemeOp.TModifyScheme CreateTable = 1;
NKikimrSchemeOp.TModifyScheme DropTable = 2;
- TAlterTable AlterTable = 3;
+ NKikimrSchemeOp.TModifyScheme AlterTable = 3;
+ NKikimrIndexBuilder.TIndexBuildSettings BuildOperation = 4;
+
+ string Type = 9;
+ uint64 Flags = 10;
}
}
diff --git a/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt
index cb46f19a8e..be87db3480 100644
--- a/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/ydb_convert/CMakeLists.darwin-x86_64.txt
@@ -34,4 +34,5 @@ target_sources(ydb-core-ydb_convert PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp
)
diff --git a/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt b/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt
index 64a21db00c..0607942d3c 100644
--- a/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/ydb_convert/CMakeLists.linux-aarch64.txt
@@ -35,4 +35,5 @@ target_sources(ydb-core-ydb_convert PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp
)
diff --git a/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt
index 64a21db00c..0607942d3c 100644
--- a/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/ydb_convert/CMakeLists.linux-x86_64.txt
@@ -35,4 +35,5 @@ target_sources(ydb-core-ydb_convert PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp
)
diff --git a/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt b/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt
index cb46f19a8e..be87db3480 100644
--- a/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/ydb_convert/CMakeLists.windows-x86_64.txt
@@ -34,4 +34,5 @@ target_sources(ydb-core-ydb_convert PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_profiles.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/tx_proxy_status.cpp
)
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index bd020c8703..dc1f5e343a 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -332,7 +332,9 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
if (OpType == EAlterOperationKind::Attribute) {
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes);
- modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId);
+ if (resolvedPathId) {
+ modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId);
+ }
auto& alter = *modifyScheme->MutableAlterUserAttributes();
alter.SetPathName(name);
diff --git a/ydb/core/ydb_convert/tx_proxy_status.cpp b/ydb/core/ydb_convert/tx_proxy_status.cpp
new file mode 100644
index 0000000000..83767aa6f3
--- /dev/null
+++ b/ydb/core/ydb_convert/tx_proxy_status.cpp
@@ -0,0 +1,75 @@
+#include "tx_proxy_status.h"
+
+namespace NKikimr {
+
+
+bool IsTxProxyInProgress(const Ydb::StatusIds::StatusCode& code) {
+ return code == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+}
+
+Ydb::StatusIds::StatusCode YdbStatusFromProxyStatus(TEvTxUserProxy::TEvProposeTransactionStatus* msg)
+{
+ const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
+ auto issueMessage = msg->Record.GetIssues();
+
+ switch (status) {
+ case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: {
+ if (msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess || msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) {
+ return Ydb::StatusIds::SUCCESS;
+ }
+ break;
+ }
+ case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress: {
+ return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ }
+ case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::WrongRequest: {
+ return Ydb::StatusIds::BAD_REQUEST;
+ }
+ case TEvTxUserProxy::TResultStatus::AccessDenied: {
+ return Ydb::StatusIds::UNAUTHORIZED;
+ }
+ case TEvTxUserProxy::TResultStatus::ProxyShardNotAvailable: {
+ return Ydb::StatusIds::UNAVAILABLE;
+ }
+ case TEvTxUserProxy::TResultStatus::ResolveError: {
+ return Ydb::StatusIds::SCHEME_ERROR;
+ }
+ case TEvTxUserProxy::TResultStatus::ExecError: {
+ switch (msg->Record.GetSchemeShardStatus()) {
+ case NKikimrScheme::EStatus::StatusMultipleModifications: {
+ return Ydb::StatusIds::OVERLOADED;
+ }
+ case NKikimrScheme::EStatus::StatusInvalidParameter: {
+ return Ydb::StatusIds::BAD_REQUEST;
+ }
+ case NKikimrScheme::EStatus::StatusSchemeError:
+ case NKikimrScheme::EStatus::StatusNameConflict:
+ case NKikimrScheme::EStatus::StatusPathDoesNotExist: {
+ return Ydb::StatusIds::SCHEME_ERROR;
+ }
+ case NKikimrScheme::EStatus::StatusQuotaExceeded: {
+ return Ydb::StatusIds::OVERLOADED;
+ }
+ case NKikimrScheme::EStatus::StatusResourceExhausted:
+ case NKikimrScheme::EStatus::StatusPreconditionFailed: {
+ return Ydb::StatusIds::PRECONDITION_FAILED;
+ }
+ default: {
+ return Ydb::StatusIds::GENERIC_ERROR;
+ }
+ }
+ }
+ default: {
+ TStringStream str;
+ str << "Got unknown TEvProposeTransactionStatus (" << status << ") response from TxProxy";
+ const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str());
+ auto tmp = issueMessage.Add();
+ NYql::IssueToMessage(issue, tmp);
+ return Ydb::StatusIds::INTERNAL_ERROR;
+ }
+ }
+
+ return Ydb::StatusIds::INTERNAL_ERROR;
+}
+
+} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/core/ydb_convert/tx_proxy_status.h b/ydb/core/ydb_convert/tx_proxy_status.h
new file mode 100644
index 0000000000..b76e342d90
--- /dev/null
+++ b/ydb/core/ydb_convert/tx_proxy_status.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+namespace NKikimr {
+
+// function returns status if we are ready to reply with a specific status to the client
+// and Ydb::StatusIds::StatusCode:: otherwise - when operation or transaction is in the progress.
+Ydb::StatusIds::StatusCode YdbStatusFromProxyStatus(TEvTxUserProxy::TEvProposeTransactionStatus* msg);
+
+bool IsTxProxyInProgress(const Ydb::StatusIds::StatusCode& code);
+
+} \ No newline at end of file
diff --git a/ydb/core/ydb_convert/ya.make b/ydb/core/ydb_convert/ya.make
index cf06ad50d3..3893704bbd 100644
--- a/ydb/core/ydb_convert/ya.make
+++ b/ydb/core/ydb_convert/ya.make
@@ -7,6 +7,7 @@ SRCS(
table_description.cpp
table_profiles.cpp
ydb_convert.cpp
+ tx_proxy_status.cpp
)
PEERDIR(
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 13f755790a..2d08c2bd5b 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -1042,7 +1042,7 @@ namespace NKikimr::NDataStreams::V1 {
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo);
- void ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override;
+ void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override;
private:
TString ConsumerName;
@@ -1098,7 +1098,7 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TRegisterStreamConsumerActor::ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
+ void TRegisterStreamConsumerActor::OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
Ydb::DataStreams::V1::RegisterStreamConsumerResult result;
auto consumer = result.Mutableconsumer();