aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhatsername <whatsername@yandex-team.com>2023-12-01 03:12:29 +0300
committerwhatsername <whatsername@yandex-team.com>2023-12-01 03:43:03 +0300
commitdaa13a041fb3abb4eb601846282c2398523e39d2 (patch)
treef2a0b3da8af96c1d948c49721a80dc36745c96a9
parentd45f28ae1b15b45f5026ae1add338558570a49d9 (diff)
downloadydb-daa13a041fb3abb4eb601846282c2398523e39d2.tar.gz
ROREN-48: Use single table writer in TWriteProtoParDo
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.cpp26
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.h16
2 files changed, 41 insertions, 1 deletions
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp
index 955bba26e1..62593ff21e 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp
@@ -7,6 +7,8 @@
#include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/io/job_writer.h>
+
#include <yt/yt_proto/yt/formats/extension.pb.h>
#include <google/protobuf/unknown_field_set.h>
@@ -191,4 +193,28 @@ void TLenvalProtoTableWriter::Abort()
////////////////////////////////////////////////////////////////////////////////
+TLenvalProtoSingleTableWriter::TLenvalProtoSingleTableWriter(
+ THolder<IProxyOutput> output,
+ const Descriptor* descriptor)
+ : TLenvalProtoTableWriter(std::move(output), {descriptor})
+{ }
+
+void TLenvalProtoSingleTableWriter::AddRow(const Message& row, size_t tableIndex)
+{
+ ValidateProtoDescriptor(row, 0, Descriptors_, false);
+
+ Y_ABORT_UNLESS(row.GetReflection()->GetUnknownFields(row).empty(),
+ "Message has unknown fields. This probably means bug in client code.\n"
+ "Message: %s", row.DebugString().data());
+
+ auto* stream = Output_->GetStream(tableIndex);
+ i32 size = row.ByteSize();
+ stream->Write(&size, sizeof(size));
+ bool serializedOk = row.SerializeToArcadiaStream(stream);
+ Y_ENSURE(serializedOk, "Failed to serialize protobuf message");
+ Output_->OnRowFinished(tableIndex);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h
index 1bf43d2f98..532cf3302b 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.h
+++ b/yt/cpp/mapreduce/io/proto_table_writer.h
@@ -50,7 +50,7 @@ public:
void FinishTable(size_t) override;
void Abort() override;
-private:
+protected:
THolder<IProxyOutput> Output_;
TVector<const ::google::protobuf::Descriptor*> Descriptors_;
};
@@ -65,4 +65,18 @@ TNode MakeNodeFromMessage(const ::google::protobuf::Message& row);
////////////////////////////////////////////////////////////////////////////////
+class TLenvalProtoSingleTableWriter
+ : public TLenvalProtoTableWriter
+{
+public:
+ TLenvalProtoSingleTableWriter(
+ THolder<IProxyOutput> output,
+ const ::google::protobuf::Descriptor* descriptor);
+ ~TLenvalProtoSingleTableWriter() override = default;
+
+ void AddRow(const Message& row, size_t tableIndex) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT