diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-11-28 17:44:07 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-11-28 17:59:29 +0300 |
commit | 796e6186c6652f49958e68c7eb0f06c52827e702 (patch) | |
tree | 1d5b6d15afe603604b5cd47af899ee9e48e7ad3a /yql | |
parent | d914da33a8058c17411fe1c33b6deed930f29450 (diff) | |
download | ydb-796e6186c6652f49958e68c7eb0f06c52827e702.tar.gz |
YQL-19309 Remove IDqIntegration dependency on TDqSettings
commit_hash:46cec1b389108c3cf83c120f92b37b2ebd38a8e3
Diffstat (limited to 'yql')
4 files changed, 22 insertions, 11 deletions
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h index 4f08726cf2..d1aaa655a6 100644 --- a/yql/essentials/core/dq_integration/yql_dq_integration.h +++ b/yql/essentials/core/dq_integration/yql_dq_integration.h @@ -21,7 +21,6 @@ class TJsonValue; namespace NYql { -struct TDqSettings; class TTransformationPipeline; namespace NCommon { @@ -45,12 +44,26 @@ class IDqIntegration { public: virtual ~IDqIntegration() {} - virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, - TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0; + struct TPartitionSettings { + TMaybe<ui64> DataSizePerJob; + size_t MaxPartitions = 0; + TMaybe<bool> EnableComputeActor; + bool CanFallback = false; + }; + + virtual ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) = 0; virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0; virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0; - virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + struct TWrapReadSettings { + TMaybe<TString> WatermarksMode; + TMaybe<ui64> WatermarksGranularityMs; + TMaybe<ui64> WatermarksLateArrivalDelayMs; + TMaybe<bool> WatermarksEnableIdlePartitions; + }; + + virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) = 0; virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0; virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp index 600e3c72e2..1209b53c77 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp @@ -2,8 +2,7 @@ namespace NYql { -ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&, - TVector<TString>&, TString*, TExprContext&, bool) { +ui64 TDqIntegrationBase::Partition(const TExprNode&, TVector<TString>&, TString*, TExprContext&, const TPartitionSettings& ) { return 0; } @@ -22,7 +21,7 @@ TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<cons return Nothing(); } -TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) { +TExprNode::TPtr TDqIntegrationBase::WrapRead(const TExprNode::TPtr& read, TExprContext&, const TWrapReadSettings& ) { return read; } diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h index 7cc5fd7d89..4624caede4 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h @@ -6,12 +6,11 @@ namespace NYql { class TDqIntegrationBase: public IDqIntegration { public: - ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, - TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override; + ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override; bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override; bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override; TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override; - TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) override; TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override; TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; diff --git a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp index 2e8a9b434f..263e0fe310 100644 --- a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp +++ b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp @@ -25,7 +25,7 @@ public: return Nothing(); } - ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override { partitions.clear(); partitions.emplace_back(); return 0ULL; |