diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-15 09:53:58 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-15 10:05:02 +0300 |
commit | 22d7f650080ab2127ef3a6dc00d9a31273ccc160 (patch) | |
tree | e1a0831b1cb1f290e288985da7eb46de16d8feeb | |
parent | 6d6f1645632e3d0b7a2a2f0e4d3c685dec62d407 (diff) | |
download | ydb-22d7f650080ab2127ef3a6dc00d9a31273ccc160.tar.gz |
YT-21942: Automate selection of buffer row count
ee58dd90305b5dd3c29edfc95a3ddb7c2395d2bf
-rw-r--r-- | yt/yt/client/table_client/adapters.cpp | 51 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.h | 7 |
2 files changed, 57 insertions, 1 deletions
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp index 73a306cfc3..e7b7e00c29 100644 --- a/yt/yt/client/table_client/adapters.cpp +++ b/yt/yt/client/table_client/adapters.cpp @@ -1,4 +1,5 @@ #include "adapters.h" + #include "row_batch.h" #include <yt/yt/client/api/table_writer.h> @@ -15,6 +16,10 @@ using namespace NCrypto; //////////////////////////////////////////////////////////////////////////////// +const NLogging::TLogger Logger("TableClientAdapters"); + +//////////////////////////////////////////////////////////////////////////////// + class TApiFromSchemalessWriterAdapter : public NApi::ITableWriter { @@ -178,7 +183,7 @@ void PipeReaderToWriterByBatches( const NFormats::ISchemalessFormatWriterPtr& writer, const TRowBatchReadOptions& options, TDuration pipeDelay) -{ +try { TPeriodicYielder yielder(TDuration::Seconds(1)); while (auto batch = reader->Read(options)) { @@ -202,6 +207,50 @@ void PipeReaderToWriterByBatches( WaitFor(writer->Close()) .ThrowOnError(); +} catch (const std::exception& ex) { + YT_LOG_ERROR(ex, "PipeReaderToWriterByBatches failed"); + + THROW_ERROR_EXCEPTION(ex); +} + +void PipeReaderToAdaptiveWriterByBatches( + const ITableReaderPtr& reader, + const NFormats::ISchemalessFormatWriterPtr& writer, + TRowBatchReadOptions options, + TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater, + TDuration pipeDelay) +try { + TPeriodicYielder yielder(TDuration::Seconds(1)); + + while (auto batch = reader->Read(options)) { + yielder.TryYield(); + + if (batch->IsEmpty()) { + WaitFor(reader->GetReadyEvent()) + .ThrowOnError(); + continue; + } + + if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) { + TDelayedExecutor::WaitForDuration(pipeDelay); + } + + NProfiling::TWallTimer timer; + + if (!writer->WriteBatch(batch)) { + WaitFor(writer->GetReadyEvent()) + .ThrowOnError(); + } + + optionsUpdater(&options, timer.GetElapsedTime()); + } + + WaitFor(writer->Close()) + .ThrowOnError(); +} catch (const std::exception& ex) { + YT_LOG_ERROR(ex, "PipeReaderToAdaptiveWriterByBatches failed"); + + THROW_ERROR_EXCEPTION(ex); } void PipeInputToOutput( diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h index 9994dd1c8c..df629939e3 100644 --- a/yt/yt/client/table_client/adapters.h +++ b/yt/yt/client/table_client/adapters.h @@ -44,6 +44,13 @@ void PipeReaderToWriterByBatches( const TRowBatchReadOptions& options, TDuration pipeDelay = TDuration::Zero()); +void PipeReaderToAdaptiveWriterByBatches( + const NApi::ITableReaderPtr& reader, + const NFormats::ISchemalessFormatWriterPtr& writer, + TRowBatchReadOptions startingOptions, + TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater, + TDuration pipeDelay = TDuration::Zero()); + void PipeInputToOutput( IInputStream* input, IOutputStream* output, |