aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-19 17:14:44 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-19 17:14:44 +0300
commit9605d5b8512fc5cde8e5a00cd42187b96bccbbdb (patch)
treef65d6b45c55b9eab480d62876b5be248aefff033
parentb7b0ba78492d2da7175f3ea46d2d1c7326cbf7ba (diff)
downloadydb-9605d5b8512fc5cde8e5a00cd42187b96bccbbdb.tar.gz
fix.
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp5
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp11
2 files changed, 12 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
index 684bf5b7bf3..99e491670ae 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
@@ -95,7 +95,10 @@ private:
YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async.");
- if (dqWriteStatus != TStatus::Ok)
+ if (TStatus::Repeat == dqWriteStatus)
+ output->SetState(TExprNode::EState::ExecutionRequired);
+
+ if (TStatus::Ok != dqWriteStatus)
return dqWriteStatus;
output->SetState(TExprNode::EState::ExecutionComplete);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index d6aa30301d0..927269d61cc 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -141,15 +141,21 @@ private:
return TStatus::Ok;
}
- TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) {
+ TStatus HandleOutput(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) {
return TStatus::Error;
}
- if (!EnsureFlowType(*input->Child(TS3SinkOutput::idx_Input), ctx)) {
+ const auto source = input->Child(TS3SinkOutput::idx_Input);
+ if (!EnsureNewSeqType<false, false>(*source, ctx)) {
return TStatus::Error;
}
+ if (ETypeAnnotationKind::Stream == source->GetTypeAnn()->GetKind()) {
+ output = ctx.ChangeChild(*input, TS3SinkOutput::idx_Input, ctx.NewCallable(source->Pos(), "ToFlow", {input->ChildPtr(TS3SinkOutput::idx_Input)}));
+ return TStatus::Repeat;
+ }
+
if (!EnsureAtom(*input->Child(TS3SinkOutput::idx_Format), ctx)) {
return TStatus::Error;
}
@@ -162,7 +168,6 @@ private:
return TStatus::Error;
}
- const auto source = input->Child(TS3SinkOutput::idx_Input);
const auto itemType = source->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType();
if (!EnsureStructType(source->Pos(), *itemType, ctx)) {
return TStatus::Error;