diff options
| author | achains <[email protected]> | 2025-11-10 16:32:52 +0300 |
|---|---|---|
| committer | achains <[email protected]> | 2025-11-10 17:26:03 +0300 |
| commit | 28244e705d32f688896c3f2986d012d74fc1e487 (patch) | |
| tree | c107ae43ed3c34840ba830965fb45e987f647fec /yt/cpp/mapreduce/io/node_table_writer.cpp | |
| parent | 4a9fcc0815e9f0896644f22f0de847abffa8ebcc (diff) | |
YT-26425: Distributed API http proxy light requests
* Changelog entry
Type: feature
Component: cpp-sdk
Support distributed API in C\+\+ SDK
<Message for release notes>
commit_hash:689a3c978864fa4623f3b38ce031faa96532b3fe
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_writer.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_writer.cpp | 65 |
1 files changed, 48 insertions, 17 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp index c516c7b4ee1..3891a01490d 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.cpp +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -3,15 +3,40 @@ #include <yt/cpp/mapreduce/common/node_visitor.h> #include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> +#include <library/cpp/yson/node/node_io.h> + #include <library/cpp/yson/writer.h> namespace NYT { //////////////////////////////////////////////////////////////////////////////// +void WriteRow(::NYson::TYsonWriter* writer, const TNode& row) +{ + if (row.HasAttributes()) { + ythrow TIOException() << "Row cannot have attributes"; + } + + static const TNode emptyMap = TNode::CreateMap(); + const TNode* outRow = &emptyMap; + if (row.GetType() != TNode::Undefined) { + if (!row.IsMap()) { + ythrow TIOException() << "Row should be a map node"; + } else { + outRow = &row; + } + } + + writer->OnListItem(); + + TNodeVisitor visitor(writer); + visitor.Visit(*outRow); +} + TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format) : Output_(output.Release()) { @@ -40,25 +65,9 @@ void TNodeTableWriter::FinishTable(size_t tableIndex) { void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex) { - if (row.HasAttributes()) { - ythrow TIOException() << "Row cannot have attributes"; - } - - static const TNode emptyMap = TNode::CreateMap(); - const TNode* outRow = &emptyMap; - if (row.GetType() != TNode::Undefined) { - if (!row.IsMap()) { - ythrow TIOException() << "Row should be a map node"; - } else { - outRow = &row; - } - } - auto* writer = Writers_[tableIndex].get(); - writer->OnListItem(); - TNodeVisitor visitor(writer); - visitor.Visit(*outRow); + WriteRow(writer, row); Output_->OnRowFinished(tableIndex); } @@ -74,4 +83,26 @@ void TNodeTableWriter::Abort() //////////////////////////////////////////////////////////////////////////////// +TNodeTableFragmentWriter::TNodeTableFragmentWriter(std::unique_ptr<IOutputStreamWithResponse> output, ::NYson::EYsonFormat format) + : Output_(std::move(output)) + , Writer_(std::make_unique<::NYson::TYsonWriter>(Output_.get(), format, NYT::NYson::EYsonType::ListFragment)) +{ } + +TWriteTableFragmentResult TNodeTableFragmentWriter::GetWriteFragmentResult() const +{ + return TWriteTableFragmentResult(NodeFromYsonString(Output_->GetResponse())); +} + +void TNodeTableFragmentWriter::AddRow(const TNode& row) +{ + WriteRow(Writer_.get(), row); +} + +void TNodeTableFragmentWriter::Finish() +{ + Output_->Finish(); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT |
