summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaxim-yurchuk <[email protected]>2025-02-07 21:49:51 +0300
committermaxim-yurchuk <[email protected]>2025-02-07 22:06:08 +0300
commitbddf3d433f03b81e174e294aec8de67f575509d8 (patch)
treea46b5cb4b7cd4e4e9b51e38014ca4200e4a9ffc0
parentcf8f9833929b938f683b622c78d004913425ebb9 (diff)
Revert commit rXXXXXX, Revert commit rXXXXXX, Use yt/yql/providers for allow list (github ydb sync)
Оказалось что директорию все таки можно использовать. Без нее не собирается (просто изначально были слишком широкие зависимости, они были исправлены в <HIDDEN_URL> ) commit_hash:3080f60346b4e3e213f113d74154e1a1eb0b979c
-rw-r--r--yt/yql/providers/ytflow/expr_nodes/ya.make33
-rw-r--r--yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp51
-rw-r--r--yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h31
-rw-r--r--yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json128
-rw-r--r--yt/yql/providers/ytflow/integration/interface/ya.make12
-rw-r--r--yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp1
-rw-r--r--yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h43
-rw-r--r--yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp1
-rw-r--r--yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h54
-rw-r--r--yt/yql/providers/ytflow/integration/proto/ya.make9
-rw-r--r--yt/yql/providers/ytflow/integration/proto/yt.proto14
11 files changed, 377 insertions, 0 deletions
diff --git a/yt/yql/providers/ytflow/expr_nodes/ya.make b/yt/yql/providers/ytflow/expr_nodes/ya.make
new file mode 100644
index 00000000000..04d6ce6f538
--- /dev/null
+++ b/yt/yql/providers/ytflow/expr_nodes/ya.make
@@ -0,0 +1,33 @@
+LIBRARY()
+
+PEERDIR(
+ yql/essentials/core/expr_nodes
+ yql/essentials/providers/common/provider
+)
+
+SRCS(
+ yql_ytflow_expr_nodes.cpp
+)
+
+SRCDIR(
+ yql/essentials/core/expr_nodes_gen
+)
+
+RUN_PROGRAM(
+ yql/essentials/core/expr_nodes_gen/gen
+ yql_expr_nodes_gen.jnj
+ yql_ytflow_expr_nodes.json
+ yql_ytflow_expr_nodes.gen.h
+ yql_ytflow_expr_nodes.decl.inl.h
+ yql_ytflow_expr_nodes.defs.inl.h
+ IN yql_expr_nodes_gen.jnj
+ IN yql_ytflow_expr_nodes.json
+ OUT yql_ytflow_expr_nodes.gen.h
+ OUT yql_ytflow_expr_nodes.decl.inl.h
+ OUT yql_ytflow_expr_nodes.defs.inl.h
+ OUTPUT_INCLUDES
+ ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
+ ${ARCADIA_ROOT}/util/generic/hash_set.h
+)
+
+END()
diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp
new file mode 100644
index 00000000000..3598e4b76e6
--- /dev/null
+++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp
@@ -0,0 +1,51 @@
+#include "yql_ytflow_expr_nodes.h"
+
+
+namespace NYql::NNodes {
+
+TYtflowDSource::TYtflowDSource(const TExprNode* node)
+ : TYtflowDSourceStub(node)
+{
+}
+
+TYtflowDSource::TYtflowDSource(const TExprNode::TPtr& node)
+ : TYtflowDSourceStub(node)
+{
+}
+
+bool TYtflowDSource::Match(const TExprNode* node) {
+ if (!TYtflowDSourceStub::Match(node)) {
+ return false;
+ }
+
+ if (node->Child(TYtflowDSource::idx_Category)->Content() != YtflowProviderName) {
+ return false;
+ }
+
+ return true;
+}
+
+
+TYtflowDSink::TYtflowDSink(const TExprNode* node)
+ : TYtflowDSinkStub(node)
+{
+}
+
+TYtflowDSink::TYtflowDSink(const TExprNode::TPtr& node)
+ : TYtflowDSinkStub(node)
+{
+}
+
+bool TYtflowDSink::Match(const TExprNode* node) {
+ if (!TYtflowDSinkStub::Match(node)) {
+ return false;
+ }
+
+ if (node->Child(TYtflowDSink::idx_Category)->Content() != YtflowProviderName) {
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace NYql::NNodes
diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h
new file mode 100644
index 00000000000..12ec7791099
--- /dev/null
+++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.gen.h>
+
+
+namespace NYql::NNodes {
+
+#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.decl.inl.h>
+
+class TYtflowDSource: public NGenerated::TYtflowDSourceStub<TExprBase, TCallable, TCoAtom> {
+public:
+ explicit TYtflowDSource(const TExprNode* node);
+ explicit TYtflowDSource(const TExprNode::TPtr& node);
+
+ static bool Match(const TExprNode* node);
+};
+
+
+class TYtflowDSink: public NGenerated::TYtflowDSinkStub<TExprBase, TCallable, TCoAtom> {
+public:
+ explicit TYtflowDSink(const TExprNode* node);
+ explicit TYtflowDSink(const TExprNode::TPtr& node);
+
+ static bool Match(const TExprNode* node);
+};
+
+#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.defs.inl.h>
+
+} // namespace NYql::NNodes
diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json
new file mode 100644
index 00000000000..02943510157
--- /dev/null
+++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json
@@ -0,0 +1,128 @@
+{
+ "NodeRootType": "TExprBase",
+ "NodeBuilderBase": "TNodeBuilderBase",
+ "ListBuilderBase": "TListBuilderBase",
+ "FreeArgCallableBase": "TFreeArgCallable",
+ "FreeArgBuilderBase": "TFreeArgCallableBuilderBase",
+ "Nodes": [
+ {
+ "Name": "TYtflowDSource",
+ "Base": "TCallable",
+ "Definition": "Custom",
+ "Builder": {"Generate": "None"},
+ "Match": {"Type": "Callable", "Name": "DataSource"},
+ "Children": [
+ {"Index": 0, "Name": "Category", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Cluster", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TYtflowDSink",
+ "Base": "TCallable",
+ "Definition": "Custom",
+ "Builder": {"Generate": "None"},
+ "Match": {"Type": "Callable", "Name": "DataSink"},
+ "Children": [
+ {"Index": 0, "Name": "Category", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Cluster", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TYtflowReadWrap",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowReadWrap"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TYtflowWriteWrap",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowWriteWrap"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TYtflowReadStub",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowReadStub!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "ItemType", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TYtflowPersistentSource",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowPersistentSource"},
+ "Children": [
+ {"Index": 0, "Name": "Name", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Input", "Type": "TYtflowReadWrap"}
+ ]
+ },
+ {
+ "Name": "TYtflowSinkBase",
+ "Base": "TCallable",
+ "Match": {"Type": "CallableBase"},
+ "Builder": {"Generate": "None"},
+ "Children": [
+ {"Index": 0, "Name": "Name", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TYtflowIntermediateSink",
+ "Base": "TYtflowSinkBase",
+ "Match": {"Type": "Callable", "Name": "YtflowIntermediateSink"},
+ "Children": [
+ {"Index": 1, "Name": "RowType", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TYtflowPersistentSink",
+ "Base": "TYtflowSinkBase",
+ "Match": {"Type": "Callable", "Name": "YtflowPersistentSink"},
+ "Children": [
+ {"Index": 1, "Name": "Input", "Type": "TYtflowWriteWrap"}
+ ]
+ },
+ {
+ "Name": "TYtflowOpBase",
+ "Base": "TCallable",
+ "Match": {"Type": "CallableBase"},
+ "Builder": {"Generate": "None"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Sources", "Type": "TExprList"},
+ {"Index": 2, "Name": "Sinks", "Type": "TExprList"},
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TYtflowMap",
+ "Base": "TYtflowOpBase",
+ "Match": {"Type": "Callable", "Name": "YtflowMap!"},
+ "Children": [
+ {"Index": 4, "Name": "Lambda", "Type": "TCoLambda"}
+ ]
+ },
+ {
+ "Name": "TYtflowOutput",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowOutput"},
+ "Children": [
+ {"Index": 0, "Name": "Operation", "Type": "TYtflowOpBase"},
+ {"Index": 1, "Name": "OutputIndex", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TYtflowPublish",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtflowPublish!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ }
+ ]
+}
diff --git a/yt/yql/providers/ytflow/integration/interface/ya.make b/yt/yql/providers/ytflow/integration/interface/ya.make
new file mode 100644
index 00000000000..0bdacd34e19
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/interface/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+PEERDIR(
+ yql/essentials/ast
+)
+
+SRCS(
+ yql_ytflow_integration.cpp
+ yql_ytflow_optimization.cpp
+)
+
+END()
diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp
new file mode 100644
index 00000000000..cfe0dbb8a31
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp
@@ -0,0 +1 @@
+#include "yql_ytflow_integration.h"
diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h
new file mode 100644
index 00000000000..1564be87f97
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <yql/essentials/ast/yql_expr.h>
+
+#include <util/generic/maybe.h>
+
+
+namespace google::protobuf {
+ class Any;
+} // namespace google::protobuf
+
+
+namespace NYql {
+
+class IYtflowIntegration {
+public:
+ virtual ~IYtflowIntegration() = default;
+
+ // Nothing if callable is not for reading,
+ // false if callable is for reading and there are some errors (they are added to ctx),
+ // true if callable is for reading and no issues occured.
+ virtual TMaybe<bool> CanRead(const TExprNode& read, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ // Nothing if callable is not for writing,
+ // false if callable is for writing and there are some errors (they are added to ctx),
+ // true if callable is for writing and no issues occured.
+ virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
+
+ virtual TExprNode::TPtr GetReadWorld(const TExprNode& read, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr GetWriteWorld(const TExprNode& write, TExprContext& ctx) = 0;
+
+ virtual TExprNode::TPtr GetWriteContent(const TExprNode& write, TExprContext& ctx) = 0;
+
+ virtual void FillSourceSettings(
+ const TExprNode& source, ::google::protobuf::Any& settings, TExprContext& ctx) = 0;
+
+ virtual void FillSinkSettings(
+ const TExprNode& sink, ::google::protobuf::Any& settings, TExprContext& ctx) = 0;
+};
+
+} // namespace NYql
diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp
new file mode 100644
index 00000000000..78e67047a92
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp
@@ -0,0 +1 @@
+#include "yql_ytflow_optimization.h"
diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h
new file mode 100644
index 00000000000..48fcbcf167d
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <yql/essentials/ast/yql_expr.h>
+
+
+namespace NYql {
+
+class IYtflowOptimization {
+public:
+ virtual ~IYtflowOptimization() = default;
+
+ /**
+ Apply new members subset for YtflowReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * members - expr list of atoms with new members
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read with applied new members
+ */
+ virtual TExprNode::TPtr ApplyExtractMembers(
+ const TExprNode::TPtr& read, const TExprNode::TPtr& members,
+ TExprContext& ctx) = 0;
+
+ /**
+ Apply `unordered` setting for YtflowReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read with applied setting
+ */
+ virtual TExprNode::TPtr ApplyUnordered(
+ const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ /**
+ Rewrite YtflowWriteWrap's underlying provider specific write callable
+ Args:
+ * write - provider specific write callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `write`, if no changes
+ * new write with trimmed content
+ */
+ virtual TExprNode::TPtr TrimWriteContent(
+ const TExprNode::TPtr& write, TExprContext& ctx) = 0;
+};
+
+} // namespace NYql
diff --git a/yt/yql/providers/ytflow/integration/proto/ya.make b/yt/yql/providers/ytflow/integration/proto/ya.make
new file mode 100644
index 00000000000..317d796ff17
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/proto/ya.make
@@ -0,0 +1,9 @@
+PROTO_LIBRARY()
+
+SRCS(
+ yt.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/yt/yql/providers/ytflow/integration/proto/yt.proto b/yt/yql/providers/ytflow/integration/proto/yt.proto
new file mode 100644
index 00000000000..3de6f6b985c
--- /dev/null
+++ b/yt/yql/providers/ytflow/integration/proto/yt.proto
@@ -0,0 +1,14 @@
+package NYql.NYtflow.NProto;
+
+
+message TQYTSourceMessage {
+ optional string Cluster = 1;
+ optional string Path = 2;
+ optional bytes RowType = 3;
+}
+
+message TQYTSinkMessage {
+ optional string Cluster = 1;
+ optional string Path = 2;
+ optional bytes RowType = 3;
+}