diff options
author | whatsername <whatsername@yandex-team.com> | 2023-11-13 18:02:25 +0300 |
---|---|---|
committer | whatsername <whatsername@yandex-team.com> | 2023-11-13 18:39:48 +0300 |
commit | ee08d73749380ebac21845bdd8e8c28742bea885 (patch) | |
tree | 314ebba1db4fd744528893921e36148869c330bf /yt/cpp/mapreduce/io/job_writer.cpp | |
parent | 4adb48ccbdd0373aa949f90e1c7aa5d6cee7938a (diff) | |
download | ydb-ee08d73749380ebac21845bdd8e8c28742bea885.tar.gz |
ROREN-48: Introduce SingleTableJobWriter
Diffstat (limited to 'yt/cpp/mapreduce/io/job_writer.cpp')
-rw-r--r-- | yt/cpp/mapreduce/io/job_writer.cpp | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp index cbd3f16c58..f98a538e19 100644 --- a/yt/cpp/mapreduce/io/job_writer.cpp +++ b/yt/cpp/mapreduce/io/job_writer.cpp @@ -5,36 +5,40 @@ #include <util/system/file.h> namespace NYT { +namespace NDetail { //////////////////////////////////////////////////////////////////////////////// -TJobWriter::TStream::TStream(int fd) - : TStream(Duplicate(fd)) +TJobWriterStream::TJobWriterStream(int fd) + : TJobWriterStream(Duplicate(fd)) { } -TJobWriter::TStream::TStream(const TFile& file) +TJobWriterStream::TJobWriterStream(const TFile& file) : FdFile(file) , FdOutput(FdFile) - , BufferedOutput(&FdOutput, BUFFER_SIZE) + , BufferedOutput(&FdOutput, BufferSize) { } -TJobWriter::TStream::~TStream() -{ -} +TJobWriterStream::~TJobWriterStream() +{ } + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail //////////////////////////////////////////////////////////////////////////////// TJobWriter::TJobWriter(size_t outputTableCount) { for (size_t i = 0; i < outputTableCount; ++i) { - Streams_.emplace_back(MakeHolder<TStream>(int(i * 3 + 1))); + Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(int(i * 3 + 1))); } } TJobWriter::TJobWriter(const TVector<TFile>& fileList) { for (const auto& f : fileList) { - Streams_.emplace_back(MakeHolder<TStream>(f)); + Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(f)); } } @@ -58,7 +62,7 @@ void TJobWriter::OnRowFinished(size_t) size_t TJobWriter::GetBufferMemoryUsage() const { - return TStream::BUFFER_SIZE * GetStreamCount(); + return NDetail::TJobWriterStream::BufferSize * GetStreamCount(); } //////////////////////////////////////////////////////////////////////////////// @@ -70,4 +74,34 @@ THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount) //////////////////////////////////////////////////////////////////////////////// +TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex) + : TableIndex_(tableIndex) + , Stream_(MakeHolder<NDetail::TJobWriterStream>(int(tableIndex * 3 + 1))) +{ } + +size_t TSingleStreamJobWriter::GetStreamCount() const +{ + return 1; +} + +IOutputStream* TSingleStreamJobWriter::GetStream(size_t tableIndex) const +{ + if (tableIndex != TableIndex_) { + ythrow TIOException() << + "Table index " << tableIndex << + " does not match this SignleTableJobWriter with index " << TableIndex_; + } + return &Stream_->BufferedOutput; +} + +void TSingleStreamJobWriter::OnRowFinished(size_t) +{ } + +size_t TSingleStreamJobWriter::GetBufferMemoryUsage() const +{ + return NDetail::TJobWriterStream::BufferSize * GetStreamCount(); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT |