diff options
| author | maxim-yurchuk <[email protected]> | 2025-02-07 16:27:30 +0300 |
|---|---|---|
| committer | maxim-yurchuk <[email protected]> | 2025-02-07 16:48:13 +0300 |
| commit | f1f926eea7fcf8945c320e5688d850776dcf9d6c (patch) | |
| tree | 4a6e1fb5c11e261d9ad5bd7543a516f257fc025f | |
| parent | a0c76f5d223c9437a71520b17b717c0cf0a2ac0e (diff) | |
Use yt/yql/providers for allow list (github ydb sync)
<https://github.com/ydb-platform/ydb/pull/14297#issuecomment-2641753237>
<https://storage.yandexcloud.net/ydb-gh-logs/ydb-platform/ydb/PR-check/13191654819/ya-x86-64/graph_compare_log.txt>
```
Error[-WBadDir]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: PEERDIR to missing directory: $S/yt/yql/providers/ytflow/expr_nodes
Error[-WBadDir]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: PEERDIR to missing directory: $S/yt/yql/providers/ytflow/integration/interface
Error[-WBadDir]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: PEERDIR to missing directory: $S/yt/yql/providers/ytflow/integration/proto
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_provider.h
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_provider.h
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/integration/proto/yt.pb.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
Error[-WBadIncl]: in $B/yt/yql/providers/yt/provider/libproviders-yt-provider.a: could not resolve include file: yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h included from here: $S/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp
```
\--при очередном синке возникла проблема, что не хватает таргетов
commit_hash:c402a15c3d753b7878d3b2d96c991c1c17d016a4
11 files changed, 377 insertions, 0 deletions
diff --git a/yt/yql/providers/ytflow/expr_nodes/ya.make b/yt/yql/providers/ytflow/expr_nodes/ya.make new file mode 100644 index 00000000000..04d6ce6f538 --- /dev/null +++ b/yt/yql/providers/ytflow/expr_nodes/ya.make @@ -0,0 +1,33 @@ +LIBRARY() + +PEERDIR( + yql/essentials/core/expr_nodes + yql/essentials/providers/common/provider +) + +SRCS( + yql_ytflow_expr_nodes.cpp +) + +SRCDIR( + yql/essentials/core/expr_nodes_gen +) + +RUN_PROGRAM( + yql/essentials/core/expr_nodes_gen/gen + yql_expr_nodes_gen.jnj + yql_ytflow_expr_nodes.json + yql_ytflow_expr_nodes.gen.h + yql_ytflow_expr_nodes.decl.inl.h + yql_ytflow_expr_nodes.defs.inl.h + IN yql_expr_nodes_gen.jnj + IN yql_ytflow_expr_nodes.json + OUT yql_ytflow_expr_nodes.gen.h + OUT yql_ytflow_expr_nodes.decl.inl.h + OUT yql_ytflow_expr_nodes.defs.inl.h + OUTPUT_INCLUDES + ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h + ${ARCADIA_ROOT}/util/generic/hash_set.h +) + +END() diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp new file mode 100644 index 00000000000..3598e4b76e6 --- /dev/null +++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.cpp @@ -0,0 +1,51 @@ +#include "yql_ytflow_expr_nodes.h" + + +namespace NYql::NNodes { + +TYtflowDSource::TYtflowDSource(const TExprNode* node) + : TYtflowDSourceStub(node) +{ +} + +TYtflowDSource::TYtflowDSource(const TExprNode::TPtr& node) + : TYtflowDSourceStub(node) +{ +} + +bool TYtflowDSource::Match(const TExprNode* node) { + if (!TYtflowDSourceStub::Match(node)) { + return false; + } + + if (node->Child(TYtflowDSource::idx_Category)->Content() != YtflowProviderName) { + return false; + } + + return true; +} + + +TYtflowDSink::TYtflowDSink(const TExprNode* node) + : TYtflowDSinkStub(node) +{ +} + +TYtflowDSink::TYtflowDSink(const TExprNode::TPtr& node) + : TYtflowDSinkStub(node) +{ +} + +bool TYtflowDSink::Match(const TExprNode* node) { + if (!TYtflowDSinkStub::Match(node)) { + return false; + } + + if (node->Child(TYtflowDSink::idx_Category)->Content() != YtflowProviderName) { + return false; + } + + return true; +} + +} // namespace NYql::NNodes diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h new file mode 100644 index 00000000000..12ec7791099 --- /dev/null +++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.h @@ -0,0 +1,31 @@ +#pragma once + +#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.gen.h> + + +namespace NYql::NNodes { + +#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.decl.inl.h> + +class TYtflowDSource: public NGenerated::TYtflowDSourceStub<TExprBase, TCallable, TCoAtom> { +public: + explicit TYtflowDSource(const TExprNode* node); + explicit TYtflowDSource(const TExprNode::TPtr& node); + + static bool Match(const TExprNode* node); +}; + + +class TYtflowDSink: public NGenerated::TYtflowDSinkStub<TExprBase, TCallable, TCoAtom> { +public: + explicit TYtflowDSink(const TExprNode* node); + explicit TYtflowDSink(const TExprNode::TPtr& node); + + static bool Match(const TExprNode* node); +}; + +#include <yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.defs.inl.h> + +} // namespace NYql::NNodes diff --git a/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json new file mode 100644 index 00000000000..02943510157 --- /dev/null +++ b/yt/yql/providers/ytflow/expr_nodes/yql_ytflow_expr_nodes.json @@ -0,0 +1,128 @@ +{ + "NodeRootType": "TExprBase", + "NodeBuilderBase": "TNodeBuilderBase", + "ListBuilderBase": "TListBuilderBase", + "FreeArgCallableBase": "TFreeArgCallable", + "FreeArgBuilderBase": "TFreeArgCallableBuilderBase", + "Nodes": [ + { + "Name": "TYtflowDSource", + "Base": "TCallable", + "Definition": "Custom", + "Builder": {"Generate": "None"}, + "Match": {"Type": "Callable", "Name": "DataSource"}, + "Children": [ + {"Index": 0, "Name": "Category", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Cluster", "Type": "TCoAtom"} + ] + }, + { + "Name": "TYtflowDSink", + "Base": "TCallable", + "Definition": "Custom", + "Builder": {"Generate": "None"}, + "Match": {"Type": "Callable", "Name": "DataSink"}, + "Children": [ + {"Index": 0, "Name": "Category", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Cluster", "Type": "TCoAtom"} + ] + }, + { + "Name": "TYtflowReadWrap", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowReadWrap"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] + }, + { + "Name": "TYtflowWriteWrap", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowWriteWrap"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] + }, + { + "Name": "TYtflowReadStub", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowReadStub!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "ItemType", "Type": "TExprBase"} + ] + }, + { + "Name": "TYtflowPersistentSource", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowPersistentSource"}, + "Children": [ + {"Index": 0, "Name": "Name", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Input", "Type": "TYtflowReadWrap"} + ] + }, + { + "Name": "TYtflowSinkBase", + "Base": "TCallable", + "Match": {"Type": "CallableBase"}, + "Builder": {"Generate": "None"}, + "Children": [ + {"Index": 0, "Name": "Name", "Type": "TCoAtom"} + ] + }, + { + "Name": "TYtflowIntermediateSink", + "Base": "TYtflowSinkBase", + "Match": {"Type": "Callable", "Name": "YtflowIntermediateSink"}, + "Children": [ + {"Index": 1, "Name": "RowType", "Type": "TExprBase"} + ] + }, + { + "Name": "TYtflowPersistentSink", + "Base": "TYtflowSinkBase", + "Match": {"Type": "Callable", "Name": "YtflowPersistentSink"}, + "Children": [ + {"Index": 1, "Name": "Input", "Type": "TYtflowWriteWrap"} + ] + }, + { + "Name": "TYtflowOpBase", + "Base": "TCallable", + "Match": {"Type": "CallableBase"}, + "Builder": {"Generate": "None"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "Sources", "Type": "TExprList"}, + {"Index": 2, "Name": "Sinks", "Type": "TExprList"}, + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TYtflowMap", + "Base": "TYtflowOpBase", + "Match": {"Type": "Callable", "Name": "YtflowMap!"}, + "Children": [ + {"Index": 4, "Name": "Lambda", "Type": "TCoLambda"} + ] + }, + { + "Name": "TYtflowOutput", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowOutput"}, + "Children": [ + {"Index": 0, "Name": "Operation", "Type": "TYtflowOpBase"}, + {"Index": 1, "Name": "OutputIndex", "Type": "TCoAtom"} + ] + }, + { + "Name": "TYtflowPublish", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtflowPublish!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + } + ] +} diff --git a/yt/yql/providers/ytflow/integration/interface/ya.make b/yt/yql/providers/ytflow/integration/interface/ya.make new file mode 100644 index 00000000000..0bdacd34e19 --- /dev/null +++ b/yt/yql/providers/ytflow/integration/interface/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +PEERDIR( + yql/essentials/ast +) + +SRCS( + yql_ytflow_integration.cpp + yql_ytflow_optimization.cpp +) + +END() diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp new file mode 100644 index 00000000000..cfe0dbb8a31 --- /dev/null +++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.cpp @@ -0,0 +1 @@ +#include "yql_ytflow_integration.h" diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h new file mode 100644 index 00000000000..1564be87f97 --- /dev/null +++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_integration.h @@ -0,0 +1,43 @@ +#pragma once + +#include <yql/essentials/ast/yql_expr.h> + +#include <util/generic/maybe.h> + + +namespace google::protobuf { + class Any; +} // namespace google::protobuf + + +namespace NYql { + +class IYtflowIntegration { +public: + virtual ~IYtflowIntegration() = default; + + // Nothing if callable is not for reading, + // false if callable is for reading and there are some errors (they are added to ctx), + // true if callable is for reading and no issues occured. + virtual TMaybe<bool> CanRead(const TExprNode& read, TExprContext& ctx) = 0; + virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + // Nothing if callable is not for writing, + // false if callable is for writing and there are some errors (they are added to ctx), + // true if callable is for writing and no issues occured. + virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0; + virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; + + virtual TExprNode::TPtr GetReadWorld(const TExprNode& read, TExprContext& ctx) = 0; + virtual TExprNode::TPtr GetWriteWorld(const TExprNode& write, TExprContext& ctx) = 0; + + virtual TExprNode::TPtr GetWriteContent(const TExprNode& write, TExprContext& ctx) = 0; + + virtual void FillSourceSettings( + const TExprNode& source, ::google::protobuf::Any& settings, TExprContext& ctx) = 0; + + virtual void FillSinkSettings( + const TExprNode& sink, ::google::protobuf::Any& settings, TExprContext& ctx) = 0; +}; + +} // namespace NYql diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp new file mode 100644 index 00000000000..78e67047a92 --- /dev/null +++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.cpp @@ -0,0 +1 @@ +#include "yql_ytflow_optimization.h" diff --git a/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h new file mode 100644 index 00000000000..48fcbcf167d --- /dev/null +++ b/yt/yql/providers/ytflow/integration/interface/yql_ytflow_optimization.h @@ -0,0 +1,54 @@ +#pragma once + +#include <yql/essentials/ast/yql_expr.h> + + +namespace NYql { + +class IYtflowOptimization { +public: + virtual ~IYtflowOptimization() = default; + + /** + Apply new members subset for YtflowReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * members - expr list of atoms with new members + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read with applied new members + */ + virtual TExprNode::TPtr ApplyExtractMembers( + const TExprNode::TPtr& read, const TExprNode::TPtr& members, + TExprContext& ctx) = 0; + + /** + Apply `unordered` setting for YtflowReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read with applied setting + */ + virtual TExprNode::TPtr ApplyUnordered( + const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + /** + Rewrite YtflowWriteWrap's underlying provider specific write callable + Args: + * write - provider specific write callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `write`, if no changes + * new write with trimmed content + */ + virtual TExprNode::TPtr TrimWriteContent( + const TExprNode::TPtr& write, TExprContext& ctx) = 0; +}; + +} // namespace NYql diff --git a/yt/yql/providers/ytflow/integration/proto/ya.make b/yt/yql/providers/ytflow/integration/proto/ya.make new file mode 100644 index 00000000000..317d796ff17 --- /dev/null +++ b/yt/yql/providers/ytflow/integration/proto/ya.make @@ -0,0 +1,9 @@ +PROTO_LIBRARY() + +SRCS( + yt.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/yt/yql/providers/ytflow/integration/proto/yt.proto b/yt/yql/providers/ytflow/integration/proto/yt.proto new file mode 100644 index 00000000000..3de6f6b985c --- /dev/null +++ b/yt/yql/providers/ytflow/integration/proto/yt.proto @@ -0,0 +1,14 @@ +package NYql.NYtflow.NProto; + + +message TQYTSourceMessage { + optional string Cluster = 1; + optional string Path = 2; + optional bytes RowType = 3; +} + +message TQYTSinkMessage { + optional string Cluster = 1; + optional string Path = 2; + optional bytes RowType = 3; +} |
