diff options
author | Daniil Demin <deminds@ydb.tech> | 2025-02-13 20:22:39 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-13 17:22:39 +0000 |
commit | 68fb3935e206a006c98b5268401bea04ff5bd475 (patch) | |
tree | 363be595003404569d46768ca136c65c230b29ce | |
parent | 9b57d61fea2a64aec2b2bffed57f8ecc3197eb7c (diff) | |
download | ydb-68fb3935e206a006c98b5268401bea04ff5bd475.tar.gz |
External Data Sources: RPC implementation for the describe methods + ydb scheme describe support (#14509)
-rw-r--r-- | ydb/core/grpc_services/rpc_describe_external_data_source.cpp | 91 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_describe_external_table.cpp | 91 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_table.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/jaeger_tracing/request_discriminator.cpp | 2 | ||||
-rw-r--r-- | ydb/core/jaeger_tracing/request_discriminator.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp | 34 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp | 8 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table.cpp | 2 |
11 files changed, 242 insertions, 0 deletions
diff --git a/ydb/core/grpc_services/rpc_describe_external_data_source.cpp b/ydb/core/grpc_services/rpc_describe_external_data_source.cpp new file mode 100644 index 0000000000..caa5814f49 --- /dev/null +++ b/ydb/core/grpc_services/rpc_describe_external_data_source.cpp @@ -0,0 +1,91 @@ +#include "rpc_scheme_base.h" +#include "service_table.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/ydb_table.pb.h> + +namespace NKikimr::NGRpcService { + +using namespace NActors; +using namespace NYql; + +using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall< + Ydb::Table::DescribeExternalDataSourceRequest, + Ydb::Table::DescribeExternalDataSourceResponse +>; + +class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest> { + using TBase = TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest>; + +public: + + using TBase::TBase; + + void Bootstrap() { + DescribeScheme(); + } + +private: + + void DescribeScheme() { + auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>(); + SetAuthToken(ev, *Request_); + SetDatabase(ev.get(), *Request_); + ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path()); + + Send(MakeTxProxyID(), ev.release()); + Become(&TDescribeExternalDataSourceRPC::StateDescribeScheme); + } + + STATEFN(StateDescribeScheme) { + switch (ev->GetTypeRewrite()) { + HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); + default: + return TBase::StateWork(ev); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->GetRecord(); + const auto& pathDescription = record.GetPathDescription(); + + if (record.HasReason()) { + Request_->RaiseIssue(TIssue(record.GetReason())); + } + + switch (record.GetStatus()) { + case NKikimrScheme::StatusSuccess: { + if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalDataSource) { + Request_->RaiseIssue(TIssue( + TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType() + )); + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + return ReplyWithResult( + Ydb::StatusIds::SUCCESS, + Ydb::Table::DescribeExternalDataSourceResult(), // to do: convert private protobuf to public + ctx + ); + } + case NKikimrScheme::StatusPathDoesNotExist: + case NKikimrScheme::StatusSchemeError: + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + + case NKikimrScheme::StatusAccessDenied: + return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx); + + case NKikimrScheme::StatusNotAvailable: + return Reply(Ydb::StatusIds::UNAVAILABLE, ctx); + + default: + return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx); + } + } +}; + +void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { + f.RegisterActor(new TDescribeExternalDataSourceRPC(p.release())); +} + +} diff --git a/ydb/core/grpc_services/rpc_describe_external_table.cpp b/ydb/core/grpc_services/rpc_describe_external_table.cpp new file mode 100644 index 0000000000..68c26bad01 --- /dev/null +++ b/ydb/core/grpc_services/rpc_describe_external_table.cpp @@ -0,0 +1,91 @@ +#include "rpc_scheme_base.h" +#include "service_table.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/ydb_table.pb.h> + +namespace NKikimr::NGRpcService { + +using namespace NActors; +using namespace NYql; + +using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall< + Ydb::Table::DescribeExternalTableRequest, + Ydb::Table::DescribeExternalTableResponse +>; + +class TDescribeExternalTableRPC : public TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest> { + using TBase = TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest>; + +public: + + using TBase::TBase; + + void Bootstrap() { + DescribeScheme(); + } + +private: + + void DescribeScheme() { + auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>(); + SetAuthToken(ev, *Request_); + SetDatabase(ev.get(), *Request_); + ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path()); + + Send(MakeTxProxyID(), ev.release()); + Become(&TDescribeExternalTableRPC::StateDescribeScheme); + } + + STATEFN(StateDescribeScheme) { + switch (ev->GetTypeRewrite()) { + HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); + default: + return TBase::StateWork(ev); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->GetRecord(); + const auto& pathDescription = record.GetPathDescription(); + + if (record.HasReason()) { + Request_->RaiseIssue(TIssue(record.GetReason())); + } + + switch (record.GetStatus()) { + case NKikimrScheme::StatusSuccess: { + if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalTable) { + Request_->RaiseIssue(TIssue( + TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType() + )); + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + return ReplyWithResult( + Ydb::StatusIds::SUCCESS, + Ydb::Table::DescribeExternalTableResult(), // to do: convert private proto to public + ctx + ); + } + case NKikimrScheme::StatusPathDoesNotExist: + case NKikimrScheme::StatusSchemeError: + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + + case NKikimrScheme::StatusAccessDenied: + return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx); + + case NKikimrScheme::StatusNotAvailable: + return Reply(Ydb::StatusIds::UNAVAILABLE, ctx); + + default: + return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx); + } + } +}; + +void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { + f.RegisterActor(new TDescribeExternalTableRPC(p.release())); +} + +} diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h index 23053b1b01..e2cc73e1bc 100644 --- a/ydb/core/grpc_services/service_table.h +++ b/ydb/core/grpc_services/service_table.h @@ -16,6 +16,8 @@ void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvide void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index bd7d0a7377..be849cc604 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -37,6 +37,8 @@ SRCS( rpc_describe_path.cpp rpc_describe_table.cpp rpc_describe_table_options.cpp + rpc_describe_external_data_source.cpp + rpc_describe_external_table.cpp rpc_drop_coordination_node.cpp rpc_drop_table.cpp rpc_discovery.cpp diff --git a/ydb/core/jaeger_tracing/request_discriminator.cpp b/ydb/core/jaeger_tracing/request_discriminator.cpp index f2298a7781..73f46bec13 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.cpp +++ b/ydb/core/jaeger_tracing/request_discriminator.cpp @@ -42,6 +42,8 @@ extern const THashMap<TStringBuf, ERequestType> NameToRequestType = { {"Table.StreamExecuteScanQuery", ERequestType::TABLE_STREAMEXECUTESCANQUERY}, {"Table.StreamReadTable", ERequestType::TABLE_STREAMREADTABLE}, {"Table.ReadRows", ERequestType::TABLE_READROWS}, + {"Table.DescribeExternalDataSource", ERequestType::TABLE_DESCRIBEEXTERNALDATASOURCE}, + {"Table.DescribeExternalTable", ERequestType::TABLE_DESCRIBEEXTERNALTABLE}, {"Query.ExecuteQuery", ERequestType::QUERY_EXECUTEQUERY}, {"Query.ExecuteScript", ERequestType::QUERY_EXECUTESCRIPT}, diff --git a/ydb/core/jaeger_tracing/request_discriminator.h b/ydb/core/jaeger_tracing/request_discriminator.h index fac53d9ea3..c02f08281c 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.h +++ b/ydb/core/jaeger_tracing/request_discriminator.h @@ -47,6 +47,8 @@ enum class ERequestType: size_t { TABLE_STREAMEXECUTESCANQUERY, TABLE_STREAMREADTABLE, TABLE_READROWS, + TABLE_DESCRIBEEXTERNALDATASOURCE, + TABLE_DESCRIBEEXTERNALTABLE, QUERY_EXECUTEQUERY, QUERY_EXECUTESCRIPT, diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index ffe64c89d8..eff8b0528b 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -283,6 +283,10 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib return DescribeReplication(driver); case NScheme::ESchemeEntryType::View: return DescribeView(driver); + case NScheme::ESchemeEntryType::ExternalDataSource: + return DescribeExternalDataSource(driver); + case NScheme::ESchemeEntryType::ExternalTable: + return DescribeExternalTable(driver); default: return DescribeEntryDefault(entry); } @@ -615,6 +619,36 @@ int TCommandDescribe::DescribeView(const TDriver& driver) { return PrintDescription(this, OutputFormat, result, &TCommandDescribe::PrintViewResponsePretty); } +int TCommandDescribe::PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& description) const { + // to do + return EXIT_SUCCESS; +} + +int TCommandDescribe::DescribeExternalDataSource(const TDriver& driver) { + NTable::TTableClient client(driver); + const auto sessionResult = client.CreateSession().ExtractValueSync(); + NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult); + const auto description = sessionResult.GetSession().DescribeExternalDataSource(Path).ExtractValueSync(); + NStatusHelpers::ThrowOnErrorOrPrintIssues(description); + + return PrintDescription(this, OutputFormat, description.GetExternalDataSourceDescription(), &TCommandDescribe::PrintExternalDataSourceResponsePretty); +} + +int TCommandDescribe::PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& description) const { + // to do + return EXIT_SUCCESS; +} + +int TCommandDescribe::DescribeExternalTable(const TDriver& driver) { + NTable::TTableClient client(driver); + const auto sessionResult = client.CreateSession().ExtractValueSync(); + NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult); + const auto result = sessionResult.GetSession().DescribeExternalTable(Path).ExtractValueSync(); + NStatusHelpers::ThrowOnErrorOrPrintIssues(result); + + return PrintDescription(this, OutputFormat, result.GetExternalTableDescription(), &TCommandDescribe::PrintExternalTableResponsePretty); +} + namespace { void PrintColumns(const NTable::TTableDescription& tableDescription) { if (!tableDescription.GetTableColumns().size()) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h index cfd2b9df20..26bd5e62f6 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h @@ -105,6 +105,12 @@ private: int DescribeView(const TDriver& driver); int PrintViewResponsePretty(const NYdb::NView::TDescribeViewResult& result) const; + int DescribeExternalDataSource(const TDriver& driver); + int PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& result) const; + + int DescribeExternalTable(const TDriver& driver); + int PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& result) const; + int TryTopicConsumerDescribeOrFail(NYdb::TDriver& driver, const NScheme::TDescribePathResult& result); std::pair<TString, TString> ParseTopicConsumer() const; int PrintConsumerResponsePretty(const NYdb::NTopic::TConsumerDescription& description) const; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h index 7d928dedc2..e786e81f90 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h @@ -43,6 +43,8 @@ public: static ::google::protobuf::Map<TStringType, Ydb::TypedValue>* GetProtoMapPtr(TParams& params); static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); + static const Ydb::Table::DescribeExternalDataSourceResult& GetProto(const NTable::TExternalDataSourceDescription&); + static const Ydb::Table::DescribeExternalTableResult& GetProto(const NTable::TExternalTableDescription&); static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription); static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); diff --git a/ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp b/ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp index 98224b2578..131c50943f 100644 --- a/ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp +++ b/ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp @@ -12,6 +12,14 @@ const Ydb::Table::DescribeTableResult& TProtoAccessor::GetProto(const NTable::TT return tableDescription.GetProto(); } +const Ydb::Table::DescribeExternalDataSourceResult& TProtoAccessor::GetProto(const NTable::TExternalDataSourceDescription& description) { + return description.GetProto(); +} + +const Ydb::Table::DescribeExternalTableResult& TProtoAccessor::GetProto(const NTable::TExternalTableDescription& description) { + return description.GetProto(); +} + NTable::TQueryStats TProtoAccessor::FromProto(const Ydb::TableStats::QueryStats& queryStats) { return NTable::TQueryStats(queryStats); } diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp index 37067b16e3..0acf4dc966 100644 --- a/ydb/services/ydb/ydb_table.cpp +++ b/ydb/services/ydb/ydb_table.cpp @@ -85,6 +85,8 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST_LIMIT(CreateTable, DoCreateTableRequest, Rps, CREATETABLE) ADD_REQUEST_LIMIT(DropTable, DoDropTableRequest, Rps, DROPTABLE) ADD_REQUEST_LIMIT(DescribeTable, DoDescribeTableRequest, Rps, DESCRIBETABLE) + ADD_REQUEST_LIMIT(DescribeExternalDataSource, DoDescribeExternalDataSourceRequest, Rps, DESCRIBEEXTERNALDATASOURCE) + ADD_REQUEST_LIMIT(DescribeExternalTable, DoDescribeExternalTableRequest, Rps, DESCRIBEEXTERNALTABLE) ADD_REQUEST_LIMIT(CopyTable, DoCopyTableRequest, Rps, COPYTABLE) ADD_REQUEST_LIMIT(CopyTables, DoCopyTablesRequest, Rps, COPYTABLES) ADD_REQUEST_LIMIT(RenameTables, DoRenameTablesRequest, Rps, RENAMETABLES) |