aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorapollo1321 <apollo1321@yandex-team.com>2024-08-26 15:47:46 +0300
committerapollo1321 <apollo1321@yandex-team.com>2024-08-26 15:59:30 +0300
commitaf06403b33d8d529a8e7aaac332c8bce78420c35 (patch)
treea12b4374da661044b639c73ce2e18ffa0292780d /yt
parent53a12aa26f4cf057a8ea1c83089205349b59b450 (diff)
downloadydb-af06403b33d8d529a8e7aaac332c8bce78420c35.tar.gz
YT-21709: Introduce IRowBatchReader and IRowBatchWriter in client
Add IRowBatchReader and IRowBatchWriter 8a3ec111a7777a7eff6feec2db6933b6a8584f29
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/public.h3
-rw-r--r--yt/yt/client/api/row_batch_reader.h33
-rw-r--r--yt/yt/client/api/row_batch_writer.h31
-rw-r--r--yt/yt/client/api/rpc_proxy/table_reader.cpp4
-rw-r--r--yt/yt/client/api/table_reader.h19
-rw-r--r--yt/yt/client/api/table_writer.h20
-rw-r--r--yt/yt/client/table_client/adapters.cpp140
-rw-r--r--yt/yt/client/table_client/adapters.h16
-rw-r--r--yt/yt/client/unittests/mock/table_reader.h2
9 files changed, 144 insertions, 124 deletions
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index f67dbed45b..349408ae31 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -140,6 +140,9 @@ DECLARE_REFCOUNTED_STRUCT(IInternalClient)
DECLARE_REFCOUNTED_STRUCT(ITransaction)
DECLARE_REFCOUNTED_STRUCT(IStickyTransactionPool)
+DECLARE_REFCOUNTED_STRUCT(IRowBatchReader)
+DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter)
+
DECLARE_REFCOUNTED_STRUCT(ITableReader)
DECLARE_REFCOUNTED_STRUCT(ITableWriter)
diff --git a/yt/yt/client/api/row_batch_reader.h b/yt/yt/client/api/row_batch_reader.h
new file mode 100644
index 0000000000..b6536e240d
--- /dev/null
+++ b/yt/yt/client/api/row_batch_reader.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/chunk_client/ready_event_reader_base.h>
+
+#include <yt/yt/client/table_client/config.h>
+#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/unversioned_row.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IRowBatchReader
+ : public NChunkClient::IReadyEventReaderBase
+{
+ //! Attempts to read a batch of rows.
+ //!
+ //! If the returned row batch is empty, the rows are not immediately
+ //! available, and the client must invoke #GetReadyEvent and wait.
+ //! Returns nullptr when there are no more rows.
+ virtual NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options = {}) = 0;
+
+ //! Returns the name table used for constructing rows.
+ virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IRowBatchReader)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/row_batch_writer.h b/yt/yt/client/api/row_batch_writer.h
new file mode 100644
index 0000000000..db762da2df
--- /dev/null
+++ b/yt/yt/client/api/row_batch_writer.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/chunk_client/writer_base.h>
+
+#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/unversioned_row.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IRowBatchWriter
+ : public NChunkClient::IWriterBase
+{
+ //! Attempts to write a bunch of rows.
+ //!
+ //! If false is returned, the next call to #Write must be made after
+ //! invoking #GetReadyEvent and waiting for it to become ready.
+ virtual bool Write(TRange<NTableClient::TUnversionedRow> rows) = 0;
+
+ //! Returns the name table to be used for constructing rows.
+ virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IRowBatchWriter)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp
index 6ac4eedc2a..7bf5238da3 100644
--- a/yt/yt/client/api/rpc_proxy/table_reader.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp
@@ -32,7 +32,7 @@ public:
const std::vector<TString>& omittedInaccessibleColumns,
TTableSchemaPtr schema,
const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics)
- : Underlying_ (std::move(underlying))
+ : Underlying_(std::move(underlying))
, StartRowIndex_(startRowIndex)
, TableSchema_(std::move(schema))
, OmittedInaccessibleColumns_(omittedInaccessibleColumns)
@@ -65,7 +65,7 @@ public:
return dataStatistics;
}
- TFuture<void> GetReadyEvent() override
+ TFuture<void> GetReadyEvent() const override
{
return ReadyEvent_;
}
diff --git a/yt/yt/client/api/table_reader.h b/yt/yt/client/api/table_reader.h
index d2123db691..927f8dcb9f 100644
--- a/yt/yt/client/api/table_reader.h
+++ b/yt/yt/client/api/table_reader.h
@@ -1,20 +1,16 @@
#pragma once
#include "public.h"
-
-#include <yt/yt/client/table_client/unversioned_reader.h>
+#include "row_batch_reader.h"
#include <yt/yt_proto/yt/client/chunk_client/proto/data_statistics.pb.h>
-#include <yt/yt/core/actions/future.h>
-
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
-// TODO(babenko): consider joining with NTableClient::IRowBatchReader
struct ITableReader
- : public virtual TRefCounted
+ : public IRowBatchReader
{
//! Returns the starting row index within the table.
virtual i64 GetStartRowIndex() const = 0;
@@ -25,17 +21,6 @@ struct ITableReader
//! Returns various data statistics.
virtual NChunkClient::NProto::TDataStatistics GetDataStatistics() const = 0;
- //! Returns an asynchronous flag enabling to wait until data is available.
- virtual TFuture<void> GetReadyEvent() = 0;
-
- //! Attempts to read a bunch of #rows. If true is returned but #rows is empty
- //! the rows are not immediately available and the client must invoke
- //! #GetReadyEvent and wait. False is returned if the end of table was reached.
- virtual NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options = {}) = 0;
-
- //! Returns the name table used for constructing rows.
- virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0;
-
//! Returns schema of the table.
virtual const NTableClient::TTableSchemaPtr& GetTableSchema() const = 0;
diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h
index a9aff96356..fd22dbd1a0 100644
--- a/yt/yt/client/api/table_writer.h
+++ b/yt/yt/client/api/table_writer.h
@@ -1,31 +1,15 @@
#pragma once
#include "public.h"
-
-#include <yt/yt/client/table_client/unversioned_row.h>
-
-#include <yt/yt/core/actions/future.h>
+#include "row_batch_writer.h"
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
struct ITableWriter
- : public virtual TRefCounted
+ : public IRowBatchWriter
{
- //! Attempts to write a bunch of #rows. If false is returned then the rows
- //! are not accepted and the client must invoke #GetReadyEvent and wait.
- virtual bool Write(TRange<NTableClient::TUnversionedRow> rows) = 0;
-
- //! Returns an asynchronous flag enabling to wait until data is written.
- virtual TFuture<void> GetReadyEvent() = 0;
-
- //! Closes the writer. Must be the last call to the writer.
- virtual TFuture<void> Close() = 0;
-
- //! Returns the name table to be used for constructing rows.
- virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0;
-
//! Returns the schema to be used for constructing rows.
virtual const NTableClient::TTableSchemaPtr& GetSchema() const = 0;
};
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp
index e7b7e00c29..0e78c2f861 100644
--- a/yt/yt/client/table_client/adapters.cpp
+++ b/yt/yt/client/table_client/adapters.cpp
@@ -4,6 +4,8 @@
#include <yt/yt/client/api/table_writer.h>
+#include <yt/yt/client/table_client/schema.h>
+
#include <yt/yt/core/concurrency/scheduler.h>
#include <yt/yt/core/concurrency/throughput_throttler.h>
#include <yt/yt/core/concurrency/periodic_yielder.h>
@@ -13,6 +15,9 @@ namespace NYT::NTableClient {
using namespace NApi;
using namespace NConcurrency;
using namespace NCrypto;
+using namespace NFormats;
+
+using NProfiling::TWallTimer;
////////////////////////////////////////////////////////////////////////////////
@@ -21,14 +26,14 @@ const NLogging::TLogger Logger("TableClientAdapters");
////////////////////////////////////////////////////////////////////////////////
class TApiFromSchemalessWriterAdapter
- : public NApi::ITableWriter
+ : public ITableWriter
{
public:
explicit TApiFromSchemalessWriterAdapter(IUnversionedWriterPtr underlyingWriter)
: UnderlyingWriter_(std::move(underlyingWriter))
{ }
- bool Write(TRange<NTableClient::TUnversionedRow> rows) override
+ bool Write(TRange<TUnversionedRow> rows) override
{
return UnderlyingWriter_->Write(rows);
}
@@ -43,12 +48,12 @@ public:
return UnderlyingWriter_->Close();
}
- const NTableClient::TNameTablePtr& GetNameTable() const override
+ const TNameTablePtr& GetNameTable() const override
{
return UnderlyingWriter_->GetNameTable();
}
- const NTableClient::TTableSchemaPtr& GetSchema() const override
+ const TTableSchemaPtr& GetSchema() const override
{
return UnderlyingWriter_->GetSchema();
}
@@ -57,7 +62,7 @@ private:
const IUnversionedWriterPtr UnderlyingWriter_;
};
-NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter(
+ITableWriterPtr CreateApiFromSchemalessWriterAdapter(
IUnversionedWriterPtr underlyingWriter)
{
return New<TApiFromSchemalessWriterAdapter>(std::move(underlyingWriter));
@@ -69,11 +74,14 @@ class TSchemalessApiFromWriterAdapter
: public IUnversionedWriter
{
public:
- explicit TSchemalessApiFromWriterAdapter(NApi::ITableWriterPtr underlyingWriter)
+ TSchemalessApiFromWriterAdapter(
+ IRowBatchWriterPtr underlyingWriter,
+ TTableSchemaPtr schema)
: UnderlyingWriter_(std::move(underlyingWriter))
+ , Schema_(std::move(schema))
{ }
- bool Write(TRange<NTableClient::TUnversionedRow> rows) override
+ bool Write(TRange<TUnversionedRow> rows) override
{
return UnderlyingWriter_->Write(rows);
}
@@ -88,14 +96,14 @@ public:
return UnderlyingWriter_->Close();
}
- const NTableClient::TNameTablePtr& GetNameTable() const override
+ const TNameTablePtr& GetNameTable() const override
{
return UnderlyingWriter_->GetNameTable();
}
- const NTableClient::TTableSchemaPtr& GetSchema() const override
+ const TTableSchemaPtr& GetSchema() const override
{
- return UnderlyingWriter_->GetSchema();
+ return Schema_;
}
std::optional<TMD5Hash> GetDigest() const override
@@ -104,19 +112,26 @@ public:
}
private:
- const NApi::ITableWriterPtr UnderlyingWriter_;
+ const IRowBatchWriterPtr UnderlyingWriter_;
+ const TTableSchemaPtr Schema_;
};
IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter(
- NApi::ITableWriterPtr underlyingWriter)
+ IRowBatchWriterPtr underlyingWriter)
+{
+ return New<TSchemalessApiFromWriterAdapter>(std::move(underlyingWriter), New<TTableSchema>());
+}
+
+IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter(
+ ITableWriterPtr underlyingWriter)
{
- return New<TSchemalessApiFromWriterAdapter>(std::move(underlyingWriter));
+ return New<TSchemalessApiFromWriterAdapter>(underlyingWriter, underlyingWriter->GetSchema());
}
////////////////////////////////////////////////////////////////////////////////
void PipeReaderToWriter(
- const ITableReaderPtr& reader,
+ const IRowBatchReaderPtr& reader,
const IUnversionedRowsetWriterPtr& writer,
const TPipeReaderToWriterOptions& options)
{
@@ -179,78 +194,51 @@ void PipeReaderToWriter(
}
void PipeReaderToWriterByBatches(
- const ITableReaderPtr& reader,
- const NFormats::ISchemalessFormatWriterPtr& writer,
- const TRowBatchReadOptions& options,
+ const IRowBatchReaderPtr& reader,
+ const ISchemalessFormatWriterPtr& writer,
+ TRowBatchReadOptions options,
+ TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
TDuration pipeDelay)
-try {
- TPeriodicYielder yielder(TDuration::Seconds(1));
-
- while (auto batch = reader->Read(options)) {
- yielder.TryYield();
-
- if (batch->IsEmpty()) {
- WaitFor(reader->GetReadyEvent())
- .ThrowOnError();
- continue;
- }
-
- if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
- TDelayedExecutor::WaitForDuration(pipeDelay);
- }
+{
+ try {
+ TPeriodicYielder yielder(TDuration::Seconds(1));
- if (!writer->WriteBatch(batch)) {
- WaitFor(writer->GetReadyEvent())
- .ThrowOnError();
- }
- }
+ while (auto batch = reader->Read(options)) {
+ yielder.TryYield();
- WaitFor(writer->Close())
- .ThrowOnError();
-} catch (const std::exception& ex) {
- YT_LOG_ERROR(ex, "PipeReaderToWriterByBatches failed");
+ if (batch->IsEmpty()) {
+ WaitFor(reader->GetReadyEvent())
+ .ThrowOnError();
+ continue;
+ }
- THROW_ERROR_EXCEPTION(ex);
-}
+ if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
+ TDelayedExecutor::WaitForDuration(pipeDelay);
+ }
-void PipeReaderToAdaptiveWriterByBatches(
- const ITableReaderPtr& reader,
- const NFormats::ISchemalessFormatWriterPtr& writer,
- TRowBatchReadOptions options,
- TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
- TDuration pipeDelay)
-try {
- TPeriodicYielder yielder(TDuration::Seconds(1));
+ TWallTimer timer(/*start*/ false);
- while (auto batch = reader->Read(options)) {
- yielder.TryYield();
+ if (optionsUpdater) {
+ timer.Start();
+ }
- if (batch->IsEmpty()) {
- WaitFor(reader->GetReadyEvent())
- .ThrowOnError();
- continue;
- }
+ if (!writer->WriteBatch(batch)) {
+ WaitFor(writer->GetReadyEvent())
+ .ThrowOnError();
+ }
- if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
- TDelayedExecutor::WaitForDuration(pipeDelay);
+ if (optionsUpdater) {
+ optionsUpdater(&options, timer.GetElapsedTime());
+ }
}
- NProfiling::TWallTimer timer;
-
- if (!writer->WriteBatch(batch)) {
- WaitFor(writer->GetReadyEvent())
- .ThrowOnError();
- }
+ WaitFor(writer->Close())
+ .ThrowOnError();
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR(ex, "Failed to transfer batches from reader to writer");
- optionsUpdater(&options, timer.GetElapsedTime());
+ THROW_ERROR_EXCEPTION(ex);
}
-
- WaitFor(writer->Close())
- .ThrowOnError();
-} catch (const std::exception& ex) {
- YT_LOG_ERROR(ex, "PipeReaderToAdaptiveWriterByBatches failed");
-
- THROW_ERROR_EXCEPTION(ex);
}
void PipeInputToOutput(
@@ -278,7 +266,7 @@ void PipeInputToOutput(
}
void PipeInputToOutput(
- const NConcurrency::IAsyncInputStreamPtr& input,
+ const IAsyncInputStreamPtr& input,
IOutputStream* output,
i64 bufferBlockSize)
{
@@ -300,7 +288,7 @@ void PipeInputToOutput(
}
void PipeInputToOutput(
- const NConcurrency::IAsyncZeroCopyInputStreamPtr& input,
+ const IAsyncZeroCopyInputStreamPtr& input,
IOutputStream* output)
{
while (true) {
diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h
index df629939e3..f90cbe101d 100644
--- a/yt/yt/client/table_client/adapters.h
+++ b/yt/yt/client/table_client/adapters.h
@@ -1,7 +1,6 @@
#pragma once
#include "public.h"
-#include "unversioned_writer.h"
#include <yt/yt/client/api/table_reader.h>
@@ -14,6 +13,9 @@ namespace NYT::NTableClient {
////////////////////////////////////////////////////////////////////////////////
IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter(
+ NApi::IRowBatchWriterPtr underlyingWriter);
+
+IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter(
NApi::ITableWriterPtr underlyingWriter);
NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter(
@@ -33,22 +35,16 @@ struct TPipeReaderToWriterOptions
};
void PipeReaderToWriter(
- const NApi::ITableReaderPtr& reader,
+ const NApi::IRowBatchReaderPtr& reader,
const IUnversionedRowsetWriterPtr& writer,
const TPipeReaderToWriterOptions& options);
//! Parameter #pipeDelay is used only for testing.
void PipeReaderToWriterByBatches(
- const NApi::ITableReaderPtr& reader,
- const NFormats::ISchemalessFormatWriterPtr& writer,
- const TRowBatchReadOptions& options,
- TDuration pipeDelay = TDuration::Zero());
-
-void PipeReaderToAdaptiveWriterByBatches(
- const NApi::ITableReaderPtr& reader,
+ const NApi::IRowBatchReaderPtr& reader,
const NFormats::ISchemalessFormatWriterPtr& writer,
TRowBatchReadOptions startingOptions,
- TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
+ TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater = {},
TDuration pipeDelay = TDuration::Zero());
void PipeInputToOutput(
diff --git a/yt/yt/client/unittests/mock/table_reader.h b/yt/yt/client/unittests/mock/table_reader.h
index 7a8cf9825c..36ddda47ab 100644
--- a/yt/yt/client/unittests/mock/table_reader.h
+++ b/yt/yt/client/unittests/mock/table_reader.h
@@ -20,7 +20,7 @@ public:
MOCK_METHOD(NChunkClient::NProto::TDataStatistics, GetDataStatistics, (), (const, override));
- MOCK_METHOD(TFuture<void>, GetReadyEvent, (), (override));
+ MOCK_METHOD(TFuture<void>, GetReadyEvent, (), (const, override));
MOCK_METHOD(NTableClient::IUnversionedRowBatchPtr, Read, (const NTableClient::TRowBatchReadOptions& options), (override));