aboutsummaryrefslogtreecommitdiffstats
path: root/yql
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2024-11-28 17:44:07 +0300
committerudovichenko-r <udovichenko-r@yandex-team.com>2024-11-28 17:59:29 +0300
commit796e6186c6652f49958e68c7eb0f06c52827e702 (patch)
tree1d5b6d15afe603604b5cd47af899ee9e48e7ad3a /yql
parentd914da33a8058c17411fe1c33b6deed930f29450 (diff)
downloadydb-796e6186c6652f49958e68c7eb0f06c52827e702.tar.gz
YQL-19309 Remove IDqIntegration dependency on TDqSettings
commit_hash:46cec1b389108c3cf83c120f92b37b2ebd38a8e3
Diffstat (limited to 'yql')
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_integration.h21
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp5
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.h5
-rw-r--r--yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp2
4 files changed, 22 insertions, 11 deletions
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h
index 4f08726cf2..d1aaa655a6 100644
--- a/yql/essentials/core/dq_integration/yql_dq_integration.h
+++ b/yql/essentials/core/dq_integration/yql_dq_integration.h
@@ -21,7 +21,6 @@ class TJsonValue;
namespace NYql {
-struct TDqSettings;
class TTransformationPipeline;
namespace NCommon {
@@ -45,12 +44,26 @@ class IDqIntegration {
public:
virtual ~IDqIntegration() {}
- virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
- TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
+ struct TPartitionSettings {
+ TMaybe<ui64> DataSizePerJob;
+ size_t MaxPartitions = 0;
+ TMaybe<bool> EnableComputeActor;
+ bool CanFallback = false;
+ };
+
+ virtual ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) = 0;
virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0;
virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
- virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ struct TWrapReadSettings {
+ TMaybe<TString> WatermarksMode;
+ TMaybe<ui64> WatermarksGranularityMs;
+ TMaybe<ui64> WatermarksLateArrivalDelayMs;
+ TMaybe<bool> WatermarksEnableIdlePartitions;
+ };
+
+ virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) = 0;
virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0;
virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
index 600e3c72e2..1209b53c77 100644
--- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
@@ -2,8 +2,7 @@
namespace NYql {
-ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&,
- TVector<TString>&, TString*, TExprContext&, bool) {
+ui64 TDqIntegrationBase::Partition(const TExprNode&, TVector<TString>&, TString*, TExprContext&, const TPartitionSettings& ) {
return 0;
}
@@ -22,7 +21,7 @@ TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<cons
return Nothing();
}
-TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) {
+TExprNode::TPtr TDqIntegrationBase::WrapRead(const TExprNode::TPtr& read, TExprContext&, const TWrapReadSettings& ) {
return read;
}
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
index 7cc5fd7d89..4624caede4 100644
--- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
@@ -6,12 +6,11 @@ namespace NYql {
class TDqIntegrationBase: public IDqIntegration {
public:
- ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
- TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override;
+ ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override;
bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override;
bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override;
- TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
+ TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) override;
TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override;
TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
diff --git a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
index 2e8a9b434f..263e0fe310 100644
--- a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
+++ b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
@@ -25,7 +25,7 @@ public:
return Nothing();
}
- ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
+ ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
partitions.clear();
partitions.emplace_back();
return 0ULL;