summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/node_table_writer.cpp
diff options
context:
space:
mode:
authorachains <[email protected]>2025-11-10 16:32:52 +0300
committerachains <[email protected]>2025-11-10 17:26:03 +0300
commit28244e705d32f688896c3f2986d012d74fc1e487 (patch)
treec107ae43ed3c34840ba830965fb45e987f647fec /yt/cpp/mapreduce/io/node_table_writer.cpp
parent4a9fcc0815e9f0896644f22f0de847abffa8ebcc (diff)
YT-26425: Distributed API http proxy light requests
* Changelog entry Type: feature Component: cpp-sdk Support distributed API in C\+\+ SDK <Message for release notes> commit_hash:689a3c978864fa4623f3b38ce031faa96532b3fe
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_writer.cpp')
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.cpp65
1 files changed, 48 insertions, 17 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp
index c516c7b4ee1..3891a01490d 100644
--- a/yt/cpp/mapreduce/io/node_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/node_table_writer.cpp
@@ -3,15 +3,40 @@
#include <yt/cpp/mapreduce/common/node_visitor.h>
#include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <library/cpp/yson/node/node_io.h>
+
#include <library/cpp/yson/writer.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+void WriteRow(::NYson::TYsonWriter* writer, const TNode& row)
+{
+ if (row.HasAttributes()) {
+ ythrow TIOException() << "Row cannot have attributes";
+ }
+
+ static const TNode emptyMap = TNode::CreateMap();
+ const TNode* outRow = &emptyMap;
+ if (row.GetType() != TNode::Undefined) {
+ if (!row.IsMap()) {
+ ythrow TIOException() << "Row should be a map node";
+ } else {
+ outRow = &row;
+ }
+ }
+
+ writer->OnListItem();
+
+ TNodeVisitor visitor(writer);
+ visitor.Visit(*outRow);
+}
+
TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format)
: Output_(output.Release())
{
@@ -40,25 +65,9 @@ void TNodeTableWriter::FinishTable(size_t tableIndex) {
void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex)
{
- if (row.HasAttributes()) {
- ythrow TIOException() << "Row cannot have attributes";
- }
-
- static const TNode emptyMap = TNode::CreateMap();
- const TNode* outRow = &emptyMap;
- if (row.GetType() != TNode::Undefined) {
- if (!row.IsMap()) {
- ythrow TIOException() << "Row should be a map node";
- } else {
- outRow = &row;
- }
- }
-
auto* writer = Writers_[tableIndex].get();
- writer->OnListItem();
- TNodeVisitor visitor(writer);
- visitor.Visit(*outRow);
+ WriteRow(writer, row);
Output_->OnRowFinished(tableIndex);
}
@@ -74,4 +83,26 @@ void TNodeTableWriter::Abort()
////////////////////////////////////////////////////////////////////////////////
+TNodeTableFragmentWriter::TNodeTableFragmentWriter(std::unique_ptr<IOutputStreamWithResponse> output, ::NYson::EYsonFormat format)
+ : Output_(std::move(output))
+ , Writer_(std::make_unique<::NYson::TYsonWriter>(Output_.get(), format, NYT::NYson::EYsonType::ListFragment))
+{ }
+
+TWriteTableFragmentResult TNodeTableFragmentWriter::GetWriteFragmentResult() const
+{
+ return TWriteTableFragmentResult(NodeFromYsonString(Output_->GetResponse()));
+}
+
+void TNodeTableFragmentWriter::AddRow(const TNode& row)
+{
+ WriteRow(Writer_.get(), row);
+}
+
+void TNodeTableFragmentWriter::Finish()
+{
+ Output_->Finish();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT