aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-15 09:53:58 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-15 10:05:02 +0300
commit22d7f650080ab2127ef3a6dc00d9a31273ccc160 (patch)
treee1a0831b1cb1f290e288985da7eb46de16d8feeb
parent6d6f1645632e3d0b7a2a2f0e4d3c685dec62d407 (diff)
downloadydb-22d7f650080ab2127ef3a6dc00d9a31273ccc160.tar.gz
YT-21942: Automate selection of buffer row count
ee58dd90305b5dd3c29edfc95a3ddb7c2395d2bf
-rw-r--r--yt/yt/client/table_client/adapters.cpp51
-rw-r--r--yt/yt/client/table_client/adapters.h7
2 files changed, 57 insertions, 1 deletions
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp
index 73a306cfc3..e7b7e00c29 100644
--- a/yt/yt/client/table_client/adapters.cpp
+++ b/yt/yt/client/table_client/adapters.cpp
@@ -1,4 +1,5 @@
#include "adapters.h"
+
#include "row_batch.h"
#include <yt/yt/client/api/table_writer.h>
@@ -15,6 +16,10 @@ using namespace NCrypto;
////////////////////////////////////////////////////////////////////////////////
+const NLogging::TLogger Logger("TableClientAdapters");
+
+////////////////////////////////////////////////////////////////////////////////
+
class TApiFromSchemalessWriterAdapter
: public NApi::ITableWriter
{
@@ -178,7 +183,7 @@ void PipeReaderToWriterByBatches(
const NFormats::ISchemalessFormatWriterPtr& writer,
const TRowBatchReadOptions& options,
TDuration pipeDelay)
-{
+try {
TPeriodicYielder yielder(TDuration::Seconds(1));
while (auto batch = reader->Read(options)) {
@@ -202,6 +207,50 @@ void PipeReaderToWriterByBatches(
WaitFor(writer->Close())
.ThrowOnError();
+} catch (const std::exception& ex) {
+ YT_LOG_ERROR(ex, "PipeReaderToWriterByBatches failed");
+
+ THROW_ERROR_EXCEPTION(ex);
+}
+
+void PipeReaderToAdaptiveWriterByBatches(
+ const ITableReaderPtr& reader,
+ const NFormats::ISchemalessFormatWriterPtr& writer,
+ TRowBatchReadOptions options,
+ TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
+ TDuration pipeDelay)
+try {
+ TPeriodicYielder yielder(TDuration::Seconds(1));
+
+ while (auto batch = reader->Read(options)) {
+ yielder.TryYield();
+
+ if (batch->IsEmpty()) {
+ WaitFor(reader->GetReadyEvent())
+ .ThrowOnError();
+ continue;
+ }
+
+ if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
+ TDelayedExecutor::WaitForDuration(pipeDelay);
+ }
+
+ NProfiling::TWallTimer timer;
+
+ if (!writer->WriteBatch(batch)) {
+ WaitFor(writer->GetReadyEvent())
+ .ThrowOnError();
+ }
+
+ optionsUpdater(&options, timer.GetElapsedTime());
+ }
+
+ WaitFor(writer->Close())
+ .ThrowOnError();
+} catch (const std::exception& ex) {
+ YT_LOG_ERROR(ex, "PipeReaderToAdaptiveWriterByBatches failed");
+
+ THROW_ERROR_EXCEPTION(ex);
}
void PipeInputToOutput(
diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h
index 9994dd1c8c..df629939e3 100644
--- a/yt/yt/client/table_client/adapters.h
+++ b/yt/yt/client/table_client/adapters.h
@@ -44,6 +44,13 @@ void PipeReaderToWriterByBatches(
const TRowBatchReadOptions& options,
TDuration pipeDelay = TDuration::Zero());
+void PipeReaderToAdaptiveWriterByBatches(
+ const NApi::ITableReaderPtr& reader,
+ const NFormats::ISchemalessFormatWriterPtr& writer,
+ TRowBatchReadOptions startingOptions,
+ TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
+ TDuration pipeDelay = TDuration::Zero());
+
void PipeInputToOutput(
IInputStream* input,
IOutputStream* output,