summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorngc224 <[email protected]>2025-06-11 13:26:28 +0300
committerngc224 <[email protected]>2025-06-11 13:51:07 +0300
commita4f376903fb769e2d40cc68cbb5ae8ce649b3a9c (patch)
tree0d759e25f2edcf7a6484b7646f6f60bc72b68961
parent7b50230685393ac909d28facd0dea4b0b20d335f (diff)
Tune sink settings
commit_hash:d3e3a331c231bc70efd908be97d7e011c85635de
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp42
-rw-r--r--yt/yql/providers/ytflow/integration/proto/yt.proto2
2 files changed, 35 insertions, 9 deletions
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
index 5cacad97831..2024ef2ca70 100644
--- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
@@ -1,4 +1,5 @@
#include "yql_yt_ytflow_integration.h"
+#include "yql_yt_provider.h"
#include "yql_yt_table.h"
#include <yql/essentials/core/yql_expr_type_annotation.h>
@@ -78,11 +79,18 @@ public:
auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value());
auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
- auto epoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref());
+ auto commitEpoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref());
- auto tableDesc = State_->TablesData->GetTable(cluster, tableName, epoch);
+ auto tableDesc = State_->TablesData->GetTable(
+ cluster, tableName, 0);
- if (!tableDesc.Meta->IsDynamic) {
+ auto commitTableDesc = State_->TablesData->GetTable(
+ cluster, tableName, commitEpoch);
+
+ if (!tableDesc.Meta->IsDynamic
+ && tableDesc.Meta->DoesExist
+ && !(commitTableDesc.Intents & TYtTableIntent::Override)
+ ) {
AddMessage(ctx, "write to static table");
return false;
}
@@ -145,14 +153,30 @@ public:
auto maybeWriteTable = TMaybeNode<TYtWriteTable>(&sink);
YQL_ENSURE(maybeWriteTable);
- auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>();
+ NYtflow::NProto::TQYTSinkMessage sinkSettings;
- auto* rowType = TYqlRowSpecInfo(table.RowSpec()).GetType();
+ {
+ auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>();
- NYtflow::NProto::TQYTSinkMessage sinkSettings;
- sinkSettings.SetCluster(table.Cluster().StringValue());
- sinkSettings.SetPath(table.Name().StringValue());
- sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType));
+ sinkSettings.SetCluster(table.Cluster().StringValue());
+ sinkSettings.SetPath(table.Name().StringValue());
+
+ auto* rowType = maybeWriteTable.Cast().Content().Ref().GetTypeAnn()
+ ->Cast<TListExprType>()->GetItemType();
+
+ sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType));
+ }
+
+ {
+ auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value());
+ auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
+
+ auto tableDesc = State_->TablesData->GetTable(
+ cluster, tableName, 0);
+
+ sinkSettings.SetDoesExist(tableDesc.Meta->DoesExist);
+ sinkSettings.SetTruncate(tableDesc.Intents & TYtTableIntent::Override);
+ }
settings.PackFrom(sinkSettings);
}
diff --git a/yt/yql/providers/ytflow/integration/proto/yt.proto b/yt/yql/providers/ytflow/integration/proto/yt.proto
index 3de6f6b985c..17401e4fccd 100644
--- a/yt/yql/providers/ytflow/integration/proto/yt.proto
+++ b/yt/yql/providers/ytflow/integration/proto/yt.proto
@@ -11,4 +11,6 @@ message TQYTSinkMessage {
optional string Cluster = 1;
optional string Path = 2;
optional bytes RowType = 3;
+ optional bool DoesExist = 5;
+ optional bool Truncate = 6;
}