aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/job_writer.cpp
diff options
context:
space:
mode:
authorwhatsername <whatsername@yandex-team.com>2023-11-13 18:02:25 +0300
committerwhatsername <whatsername@yandex-team.com>2023-11-13 18:39:48 +0300
commitee08d73749380ebac21845bdd8e8c28742bea885 (patch)
tree314ebba1db4fd744528893921e36148869c330bf /yt/cpp/mapreduce/io/job_writer.cpp
parent4adb48ccbdd0373aa949f90e1c7aa5d6cee7938a (diff)
downloadydb-ee08d73749380ebac21845bdd8e8c28742bea885.tar.gz
ROREN-48: Introduce SingleTableJobWriter
Diffstat (limited to 'yt/cpp/mapreduce/io/job_writer.cpp')
-rw-r--r--yt/cpp/mapreduce/io/job_writer.cpp54
1 files changed, 44 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp
index cbd3f16c58..f98a538e19 100644
--- a/yt/cpp/mapreduce/io/job_writer.cpp
+++ b/yt/cpp/mapreduce/io/job_writer.cpp
@@ -5,36 +5,40 @@
#include <util/system/file.h>
namespace NYT {
+namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
-TJobWriter::TStream::TStream(int fd)
- : TStream(Duplicate(fd))
+TJobWriterStream::TJobWriterStream(int fd)
+ : TJobWriterStream(Duplicate(fd))
{ }
-TJobWriter::TStream::TStream(const TFile& file)
+TJobWriterStream::TJobWriterStream(const TFile& file)
: FdFile(file)
, FdOutput(FdFile)
- , BufferedOutput(&FdOutput, BUFFER_SIZE)
+ , BufferedOutput(&FdOutput, BufferSize)
{ }
-TJobWriter::TStream::~TStream()
-{
-}
+TJobWriterStream::~TJobWriterStream()
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
////////////////////////////////////////////////////////////////////////////////
TJobWriter::TJobWriter(size_t outputTableCount)
{
for (size_t i = 0; i < outputTableCount; ++i) {
- Streams_.emplace_back(MakeHolder<TStream>(int(i * 3 + 1)));
+ Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(int(i * 3 + 1)));
}
}
TJobWriter::TJobWriter(const TVector<TFile>& fileList)
{
for (const auto& f : fileList) {
- Streams_.emplace_back(MakeHolder<TStream>(f));
+ Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(f));
}
}
@@ -58,7 +62,7 @@ void TJobWriter::OnRowFinished(size_t)
size_t TJobWriter::GetBufferMemoryUsage() const
{
- return TStream::BUFFER_SIZE * GetStreamCount();
+ return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
}
////////////////////////////////////////////////////////////////////////////////
@@ -70,4 +74,34 @@ THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
////////////////////////////////////////////////////////////////////////////////
+TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex)
+ : TableIndex_(tableIndex)
+ , Stream_(MakeHolder<NDetail::TJobWriterStream>(int(tableIndex * 3 + 1)))
+{ }
+
+size_t TSingleStreamJobWriter::GetStreamCount() const
+{
+ return 1;
+}
+
+IOutputStream* TSingleStreamJobWriter::GetStream(size_t tableIndex) const
+{
+ if (tableIndex != TableIndex_) {
+ ythrow TIOException() <<
+ "Table index " << tableIndex <<
+ " does not match this SignleTableJobWriter with index " << TableIndex_;
+ }
+ return &Stream_->BufferedOutput;
+}
+
+void TSingleStreamJobWriter::OnRowFinished(size_t)
+{ }
+
+size_t TSingleStreamJobWriter::GetBufferMemoryUsage() const
+{
+ return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT