diff options
author | ngc224 <[email protected]> | 2025-06-11 13:26:28 +0300 |
---|---|---|
committer | ngc224 <[email protected]> | 2025-06-11 13:51:07 +0300 |
commit | a4f376903fb769e2d40cc68cbb5ae8ce649b3a9c (patch) | |
tree | 0d759e25f2edcf7a6484b7646f6f60bc72b68961 | |
parent | 7b50230685393ac909d28facd0dea4b0b20d335f (diff) |
Tune sink settings
commit_hash:d3e3a331c231bc70efd908be97d7e011c85635de
-rw-r--r-- | yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp | 42 | ||||
-rw-r--r-- | yt/yql/providers/ytflow/integration/proto/yt.proto | 2 |
2 files changed, 35 insertions, 9 deletions
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp index 5cacad97831..2024ef2ca70 100644 --- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp @@ -1,4 +1,5 @@ #include "yql_yt_ytflow_integration.h" +#include "yql_yt_provider.h" #include "yql_yt_table.h" #include <yql/essentials/core/yql_expr_type_annotation.h> @@ -78,11 +79,18 @@ public: auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value()); auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table())); - auto epoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref()); + auto commitEpoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref()); - auto tableDesc = State_->TablesData->GetTable(cluster, tableName, epoch); + auto tableDesc = State_->TablesData->GetTable( + cluster, tableName, 0); - if (!tableDesc.Meta->IsDynamic) { + auto commitTableDesc = State_->TablesData->GetTable( + cluster, tableName, commitEpoch); + + if (!tableDesc.Meta->IsDynamic + && tableDesc.Meta->DoesExist + && !(commitTableDesc.Intents & TYtTableIntent::Override) + ) { AddMessage(ctx, "write to static table"); return false; } @@ -145,14 +153,30 @@ public: auto maybeWriteTable = TMaybeNode<TYtWriteTable>(&sink); YQL_ENSURE(maybeWriteTable); - auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>(); + NYtflow::NProto::TQYTSinkMessage sinkSettings; - auto* rowType = TYqlRowSpecInfo(table.RowSpec()).GetType(); + { + auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>(); - NYtflow::NProto::TQYTSinkMessage sinkSettings; - sinkSettings.SetCluster(table.Cluster().StringValue()); - sinkSettings.SetPath(table.Name().StringValue()); - sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType)); + sinkSettings.SetCluster(table.Cluster().StringValue()); + sinkSettings.SetPath(table.Name().StringValue()); + + auto* rowType = maybeWriteTable.Cast().Content().Ref().GetTypeAnn() + ->Cast<TListExprType>()->GetItemType(); + + sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType)); + } + + { + auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value()); + auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table())); + + auto tableDesc = State_->TablesData->GetTable( + cluster, tableName, 0); + + sinkSettings.SetDoesExist(tableDesc.Meta->DoesExist); + sinkSettings.SetTruncate(tableDesc.Intents & TYtTableIntent::Override); + } settings.PackFrom(sinkSettings); } diff --git a/yt/yql/providers/ytflow/integration/proto/yt.proto b/yt/yql/providers/ytflow/integration/proto/yt.proto index 3de6f6b985c..17401e4fccd 100644 --- a/yt/yql/providers/ytflow/integration/proto/yt.proto +++ b/yt/yql/providers/ytflow/integration/proto/yt.proto @@ -11,4 +11,6 @@ message TQYTSinkMessage { optional string Cluster = 1; optional string Path = 2; optional bytes RowType = 3; + optional bool DoesExist = 5; + optional bool Truncate = 6; } |