diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-19 17:14:44 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-19 17:14:44 +0300 |
commit | 9605d5b8512fc5cde8e5a00cd42187b96bccbbdb (patch) | |
tree | f65d6b45c55b9eab480d62876b5be248aefff033 | |
parent | b7b0ba78492d2da7175f3ea46d2d1c7326cbf7ba (diff) | |
download | ydb-9605d5b8512fc5cde8e5a00cd42187b96bccbbdb.tar.gz |
fix.
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp | 11 |
2 files changed, 12 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp index 684bf5b7bf3..99e491670ae 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp @@ -95,7 +95,10 @@ private: YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async."); - if (dqWriteStatus != TStatus::Ok) + if (TStatus::Repeat == dqWriteStatus) + output->SetState(TExprNode::EState::ExecutionRequired); + + if (TStatus::Ok != dqWriteStatus) return dqWriteStatus; output->SetState(TExprNode::EState::ExecutionComplete); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index d6aa30301d0..927269d61cc 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -141,15 +141,21 @@ private: return TStatus::Ok; } - TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) { + TStatus HandleOutput(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) { return TStatus::Error; } - if (!EnsureFlowType(*input->Child(TS3SinkOutput::idx_Input), ctx)) { + const auto source = input->Child(TS3SinkOutput::idx_Input); + if (!EnsureNewSeqType<false, false>(*source, ctx)) { return TStatus::Error; } + if (ETypeAnnotationKind::Stream == source->GetTypeAnn()->GetKind()) { + output = ctx.ChangeChild(*input, TS3SinkOutput::idx_Input, ctx.NewCallable(source->Pos(), "ToFlow", {input->ChildPtr(TS3SinkOutput::idx_Input)})); + return TStatus::Repeat; + } + if (!EnsureAtom(*input->Child(TS3SinkOutput::idx_Format), ctx)) { return TStatus::Error; } @@ -162,7 +168,6 @@ private: return TStatus::Error; } - const auto source = input->Child(TS3SinkOutput::idx_Input); const auto itemType = source->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType(); if (!EnsureStructType(source->Pos(), *itemType, ctx)) { return TStatus::Error; |