aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormshulb <mshulb@yandex-team.com>2023-11-02 12:44:17 +0300
committermshulb <mshulb@yandex-team.com>2023-11-02 13:08:01 +0300
commit2ed98c75a8679c51dfa9a86494a75a0ce2c92751 (patch)
treec92e630cace4ae36d67896fb9bf0e42bd36abc44
parentaa14f36616cf44694a09a0644717c76f91d8f917 (diff)
downloadydb-2ed98c75a8679c51dfa9a86494a75a0ce2c92751.tar.gz
KIKIMR-19695 Add initial support for DROP INDEX: parsing and preparing for proxy.
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp46
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp23
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json12
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h17
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp9
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp42
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp23
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h12
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp107
12 files changed, 282 insertions, 22 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index ca56c8aac0..0540e7a542 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -575,7 +575,7 @@ public:
errResult.AddIssue(NYql::TIssue(error));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
tablePromise.SetValue(errResult);
- return tablePromise.GetFuture();
+ return tablePromise.GetFuture();
}
TGenericResult result;
result.SetSuccess();
@@ -631,7 +631,7 @@ public:
CHECK_PREPARED_DDL(AlterTable);
auto tablePromise = NewPromise<TGenericResult>();
-
+
if (!IsPrepare()) {
SessionCtx->Query().PrepareOnly = false;
if (SessionCtx->Query().PreparingQuery) {
@@ -711,7 +711,7 @@ public:
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
-
+
phyTx.MutableSchemeOperation()->MutableDropTable()->Swap(&schemeTx);
phyTx.MutableSchemeOperation()->MutableDropTable()->SetSuccessOnNotExist(settings.SuccessOnNotExist);
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index ccea19fd61..52e7652c3e 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -163,6 +163,12 @@ private:
return TStatus::Error;
}
+ TStatus HandlePgDropObject(TPgDropObject node, TExprContext& ctx) override {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
+ << "PgDropObject is not yet implemented for intent determination transformer"));
+ return TStatus::Error;
+ }
+
static void HandleDropTable(TIntrusivePtr<TKikimrSessionContext>& ctx, const NCommon::TWriteTableSettings& settings,
const TKikimrKey& key, const TStringBuf& cluster)
{
@@ -288,6 +294,8 @@ private:
return TStatus::Ok;
case TKikimrKey::Type::Permission:
return TStatus::Ok;
+ case TKikimrKey::Type::PGObject:
+ return TStatus::Ok;
}
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid table key type."));
@@ -471,6 +479,10 @@ public:
return true;
}
+ if(node.IsCallable(TPgDropObject::CallableName())) {
+ return true;
+ }
+
if (auto maybeRight = TMaybeNode<TCoNth>(&node).Tuple().Maybe<TCoRight>()) {
if (maybeRight.Input().Maybe<TKiExecDataQuery>()) {
return true;
@@ -507,6 +519,23 @@ public:
.Ptr();
}
+ static TExprNode::TPtr MakePgDropObject(const TExprNode::TPtr& node, const NCommon::TPgObjectSettings& settings,
+ const TKikimrKey& key, TExprContext& ctx)
+ {
+ bool missingOk = (settings.IfExists.Cast().Value() == "true");
+
+ return Build<TPgDropObject>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .ObjectId().Build(key.GetPGObjectId())
+ .TypeId().Build(key.GetPGObjectType())
+ .MissingOk<TCoAtom>()
+ .Value(missingOk)
+ .Build()
+ .Done()
+ .Ptr();
+ }
+
bool RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx, TExprNode::TPtr& resultNode) {
TKiDataSink dataSink(node->ChildPtr(1));
auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
@@ -942,6 +971,19 @@ public:
}
break;
}
+
+ case TKikimrKey::Type::PGObject: {
+ NCommon::TPgObjectSettings settings = NCommon::ParsePgObjectSettings(TExprList(node->Child(4)), ctx);
+
+ YQL_ENSURE(settings.Mode);
+ auto mode = settings.Mode.Cast();
+
+ if (mode == "dropIndex") {
+ return MakePgDropObject(node, settings, key, ctx);
+ } else {
+ YQL_ENSURE(false, "unknown PGObject mode \"" << TString(mode) << "\"");
+ }
+ }
}
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Failed to rewrite IO."));
@@ -1076,6 +1118,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleDropGroup(node.Cast(), ctx);
}
+ if(auto node = TMaybeNode<TPgDropObject>(input)) {
+ return HandlePgDropObject(node.Cast(), ctx);
+ }
+
if (input->IsCallable(WriteName)) {
return HandleWrite(TExprBase(input), ctx);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 8707c494cb..4c5af6a457 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -133,6 +133,8 @@ private:
return TStatus::Ok;
case TKikimrKey::Type::Permission:
return TStatus::Ok;
+ case TKikimrKey::Type::PGObject:
+ return TStatus::Ok;
}
return TStatus::Error;
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 43f6c30d86..e8193f7c53 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1834,6 +1834,29 @@ public:
}, "Executing DROP GROUP");
}
+ if (auto maybePgDropObject = TMaybeNode<TPgDropObject>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+ auto pgDrop = maybePgDropObject.Cast();
+
+ auto cluster = TString(pgDrop.DataSink().Cluster());
+ const auto type = TString(pgDrop.TypeId().Value());
+ const auto objectName = TString(pgDrop.ObjectId().Value());
+
+ if (type == "pgIndex") {
+ // TODO: KIKIMR-19695
+ ctx.AddError(TIssue(ctx.GetPosition(pgDrop.Pos()),
+ TStringBuilder() << "DROP INDEX for Postgres indexes is not implemented yet"));
+ return SyncError();
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(pgDrop.TypeId().Pos()),
+ TStringBuilder() << "Unknown PgDrop operation: " << type));
+ return SyncError();
+ }
+ }
+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
<< "(Kikimr DataSink) Failed to execute node: " << input->Content()));
return SyncError();
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index f386437d7a..799161c26e 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -381,6 +381,18 @@
{"Index": 4, "Name": "Pathes", "Type": "TCoAtomList"},
{"Index": 5, "Name": "Roles", "Type": "TCoAtomList"}
]
+ },
+ {
+ "Name": "TPgDropObject",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "PgDropObject!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
+ {"Index": 2, "Name": "ObjectId", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
+ {"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
+ ]
}
]
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index ffaa3f4594..b7928581df 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -62,6 +62,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiDataQueryBlocks::CallableName());
DataSinkNames.insert(TKiExecDataQuery::CallableName());
DataSinkNames.insert(TKiEffects::CallableName());
+ DataSinkNames.insert(TPgDropObject::CallableName());
CommitModes.insert(CommitModeFlush);
CommitModes.insert(CommitModeRollback);
@@ -397,6 +398,10 @@ bool TKikimrKey::Extract(const TExprNode& key) {
} else if(tagName == "permission") {
KeyType = Type::Permission;
Target = key.Child(0)->Child(1)->Child(0)->Content();
+ } else if (tagName == "pgObject") {
+ KeyType = Type::PGObject;
+ Target = key.Child(0)->Child(1)->Child(0)->Content();
+ ObjectType = key.Child(0)->Child(2)->Child(0)->Content();
} else {
Ctx.AddError(TIssue(Ctx.GetPosition(key.Child(0)->Pos()), TString("Unexpected tag for kikimr key: ") + tagName));
return false;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index 236952ae9c..25281b7a68 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -60,6 +60,7 @@ private:
virtual TStatus HandleDataQueryBlocks(NNodes::TKiDataQueryBlocks node, TExprContext& ctx) = 0;
virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) = 0;
virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0;
+ virtual TStatus HandlePgDropObject(NNodes::TPgDropObject node, TExprContext& ctx) = 0;
virtual TStatus HandleModifyPermissions(NNodes::TKiModifyPermissions node, TExprContext& ctx) = 0;
};
@@ -73,7 +74,8 @@ public:
Role,
Object,
Topic,
- Permission
+ Permission,
+ PGObject
};
struct TViewDescription {
@@ -137,6 +139,19 @@ public:
return Target;
}
+ const TString& GetPGObjectId() const {
+ Y_DEBUG_ABORT_UNLESS(KeyType.Defined());
+ Y_DEBUG_ABORT_UNLESS(KeyType == Type::PGObject);
+ return Target;
+ }
+
+ const TString& GetPGObjectType() const {
+ Y_DEBUG_ABORT_UNLESS(KeyType.Defined());
+ Y_DEBUG_ABORT_UNLESS(ObjectType.Defined());
+ Y_DEBUG_ABORT_UNLESS(KeyType == Type::PGObject);
+ return *ObjectType;
+ }
+
bool Extract(const TExprNode& key);
private:
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 06a55c8844..d87557a0b0 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -215,6 +215,10 @@ private:
{
return TStatus::Ok;
}
+ case TKikimrKey::Type::PGObject:
+ {
+ return TStatus::Ok;
+ }
}
return TStatus::Error;
@@ -1568,6 +1572,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}
+ virtual TStatus HandlePgDropObject(TPgDropObject node, TExprContext& /*ctx*/) override {
+ node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
+ return TStatus::Ok;
+ }
+
virtual TStatus HandleWrite(TExprBase node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Failed to annotate Write!, IO rewrite should handle this"));
return TStatus::Error;
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index beb3b91e17..593794c174 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1611,6 +1611,48 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}
+ Y_UNIT_TEST(DropIndex) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));
+
+ auto client = kikimr.GetQueryClient();
+ auto session = client.GetSession().GetValueSync().GetSession();
+ const auto txCtrl = NYdb::NQuery::TTxControl::NoTx();
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE TABLE test2(
+ id int8,
+ fk int8,
+ value char,
+ primary key(id)
+ );
+ CREATE INDEX "test2_fk_idx" ON test2 (fk);
+ CREATE INDEX "test2_fk.idx_cover" ON test2 (fk) INCLUDE(value);
+ )");
+
+ auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ DROP INDEX "test2_fk_idx";
+ )");
+
+ auto result = session.ExecuteQuery(query, txCtrl).ExtractValueSync();
+ // TODO: KIKIMR-19695
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+ // TODO: test with <schema>.<name>: "DROP INDEX test2_fk.idx_cover;"
+ }
+
Y_UNIT_TEST(CreateUniqPgColumn) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto client = kikimr.GetTableClient();
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 669aec0f42..42775a6e49 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -615,6 +615,29 @@ TCommitSettings ParseCommitSettings(NNodes::TCoCommit node, TExprContext& ctx) {
return ret;
}
+TPgObjectSettings ParsePgObjectSettings(NNodes::TExprList node, TExprContext&) {
+ TMaybeNode<TCoAtom> mode;
+ TMaybeNode<TCoAtom> ifExists;
+ for (auto child : node) {
+ if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
+ auto tuple = maybeTuple.Cast();
+ auto name = tuple.Name().Value();
+
+ if (name == "mode") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ mode = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "ifExists") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ ifExists = tuple.Value().Cast<TCoAtom>();
+ }
+ }
+ }
+
+ TPgObjectSettings ret(std::move(mode), std::move(ifExists));
+ return ret;
+}
+
+
TVector<TString> GetStructFields(const TTypeAnnotationNode* type) {
TVector<TString> fields;
if (type->GetKind() == ETypeAnnotationKind::List) {
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index 8bc6777edb..14d66073a1 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -120,6 +120,16 @@ struct TCommitSettings
bool EnsureOtherEmpty(TExprContext& ctx);
};
+struct TPgObjectSettings
+{
+ NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
+ NNodes::TMaybeNode<NNodes::TCoAtom> IfExists;
+
+ TPgObjectSettings(NNodes::TMaybeNode<NNodes::TCoAtom>&& mode, NNodes::TMaybeNode<NNodes::TCoAtom>&& ifExists)
+ : Mode(std::move(mode))
+ , IfExists(std::move(ifExists)) {}
+};
+
const TStructExprType* BuildCommonTableListType(TExprContext& ctx);
TExprNode::TPtr BuildTypeExpr(TPositionHandle pos, const TTypeAnnotationNode& ann, TExprContext& ctx);
@@ -138,6 +148,8 @@ TWritePermissionSettings ParseWritePermissionsSettings(NNodes::TExprList node, T
TCommitSettings ParseCommitSettings(NNodes::TCoCommit node, TExprContext& ctx);
+TPgObjectSettings ParsePgObjectSettings(NNodes::TExprList node, TExprContext& ctx);
+
TString FullTableName(const TStringBuf& cluster, const TStringBuf& table);
IDataProvider::TFillSettings GetFillSettings(const TExprNode& node);
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 57d1949499..19dd471cef 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -166,6 +166,23 @@ const IndexElem* IndexElement(const Node* node) {
#define AT_LOCATION_EX(node, field) \
TLocationGuard guard(this, node->field);
+std::tuple<TStringBuf, TStringBuf> getSchemaAndObjectName(const List* nameList) {
+ switch (ListLength(nameList)) {
+ case 2: {
+ const auto clusterName = StrVal(ListNodeNth(nameList, 0));
+ const auto tableName = StrVal(ListNodeNth(nameList, 1));
+ return {clusterName, tableName};
+ }
+ case 1: {
+ const auto tableName = StrVal(ListNodeNth(nameList, 0));
+ return {"", tableName};
+ }
+ default: {
+ return {"", ""};
+ }
+ }
+}
+
class TConverter : public IPGParseEvents {
friend class TLocationGuard;
@@ -1815,6 +1832,9 @@ public:
case OBJECT_TABLE: {
return ParseDropTableStmt(value, nameListNodes);
}
+ case OBJECT_INDEX: {
+ return ParseDropIndexStmt(value, nameListNodes);
+ }
default: {
AddError("Not supported object type for DROP");
return nullptr;
@@ -1858,24 +1878,7 @@ public:
}
for (const auto& nameList : names) {
- const auto getSchemaAndTableName = [] (const List* nameList) -> std::tuple<TStringBuf, TStringBuf> {
- switch (ListLength(nameList)) {
- case 2: {
- const auto clusterName = StrVal(ListNodeNth(nameList, 0));
- const auto tableName = StrVal(ListNodeNth(nameList, 1));
- return {clusterName, tableName};
- }
- case 1: {
- const auto tableName = StrVal(ListNodeNth(nameList, 0));
- return {"", tableName};
- }
- default: {
- return {"", ""};
- }
- }
- };
-
- const auto [clusterName, tableName] = getSchemaAndTableName(nameList);
+ const auto [clusterName, tableName] = getSchemaAndObjectName(nameList);
const auto [sink, key] = ParseQualifiedRelationName(
/* catalogName */ "",
clusterName,
@@ -1904,6 +1907,47 @@ public:
return Statements.back();
}
+ TAstNode* ParseDropIndexStmt(const DropStmt* value, const TVector<const List*>& names) {
+ if (value->behavior == DROP_CASCADE) {
+ AddError("CASCADE is not implemented");
+ return nullptr;
+ }
+
+ if (names.size() != 1) {
+ AddError("DROP INDEX requires exactly one index");
+ return nullptr;
+ }
+
+ for (const auto& nameList : names) {
+ const auto [clusterName, indexName] = getSchemaAndObjectName(nameList);
+ const auto [sink, key] = ParseQualifiedPgObjectName(
+ /* catalogName */ "",
+ clusterName,
+ indexName,
+ "pgIndex"
+ );
+
+ TString missingOk = (value->missing_ok) ? "true" : "false";
+ Statements.push_back(L(
+ A("let"),
+ A("world"),
+ L(
+ A("Write!"),
+ A("world"),
+ sink,
+ key,
+ L(A("Void")),
+ QL(
+ QL(QA("mode"), QA("dropIndex")),
+ QL(QA("ifExists"), QA(missingOk))
+ )
+ )
+ ));
+ }
+
+ return Statements.back();
+ }
+
[[nodiscard]]
TAstNode* ParseVariableSetStmt(const VariableSetStmt* value) {
if (value->kind != VAR_SET_VALUE) {
@@ -2338,6 +2382,33 @@ public:
return {sinkOrSource, key};
}
+
+ TAstNode* BuildPgObjectExpression(const TStringBuf objectName, const TStringBuf objectType) {
+ return L(A("Key"), QL(QA("pgObject"),
+ L(A("String"), QA(objectName)),
+ L(A("String"), QA(objectType))
+ ));
+ }
+
+ TReadWriteKeyExprs ParseQualifiedPgObjectName(const TStringBuf catalogname,
+ const TStringBuf schemaname,
+ const TStringBuf objectName,
+ const TStringBuf pgObjectType) {
+ if (!catalogname.Empty()) {
+ AddError("catalogname is not supported");
+ return {};
+ }
+ if (objectName.Empty()) {
+ AddError("objectName should be specified");
+ return {};
+ }
+
+ const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster;
+ const auto sinkOrSource = BuildClusterSinkOrSourceExpression(true, cluster);
+ const auto key = BuildPgObjectExpression(objectName, pgObjectType);
+ return {sinkOrSource, key};
+ }
+
TReadWriteKeyExprs ParseWriteRangeVar(const RangeVar *value,
bool isScheme = false) {
if (value->alias) {