diff options
author | apollo1321 <apollo1321@yandex-team.com> | 2024-08-26 15:47:46 +0300 |
---|---|---|
committer | apollo1321 <apollo1321@yandex-team.com> | 2024-08-26 15:59:30 +0300 |
commit | af06403b33d8d529a8e7aaac332c8bce78420c35 (patch) | |
tree | a12b4374da661044b639c73ce2e18ffa0292780d /yt | |
parent | 53a12aa26f4cf057a8ea1c83089205349b59b450 (diff) | |
download | ydb-af06403b33d8d529a8e7aaac332c8bce78420c35.tar.gz |
YT-21709: Introduce IRowBatchReader and IRowBatchWriter in client
Add IRowBatchReader and IRowBatchWriter
8a3ec111a7777a7eff6feec2db6933b6a8584f29
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/client/api/public.h | 3 | ||||
-rw-r--r-- | yt/yt/client/api/row_batch_reader.h | 33 | ||||
-rw-r--r-- | yt/yt/client/api/row_batch_writer.h | 31 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_reader.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/api/table_reader.h | 19 | ||||
-rw-r--r-- | yt/yt/client/api/table_writer.h | 20 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.cpp | 140 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.h | 16 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/table_reader.h | 2 |
9 files changed, 144 insertions, 124 deletions
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index f67dbed45b..349408ae31 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -140,6 +140,9 @@ DECLARE_REFCOUNTED_STRUCT(IInternalClient) DECLARE_REFCOUNTED_STRUCT(ITransaction) DECLARE_REFCOUNTED_STRUCT(IStickyTransactionPool) +DECLARE_REFCOUNTED_STRUCT(IRowBatchReader) +DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter) + DECLARE_REFCOUNTED_STRUCT(ITableReader) DECLARE_REFCOUNTED_STRUCT(ITableWriter) diff --git a/yt/yt/client/api/row_batch_reader.h b/yt/yt/client/api/row_batch_reader.h new file mode 100644 index 0000000000..b6536e240d --- /dev/null +++ b/yt/yt/client/api/row_batch_reader.h @@ -0,0 +1,33 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/chunk_client/ready_event_reader_base.h> + +#include <yt/yt/client/table_client/config.h> +#include <yt/yt/client/table_client/public.h> +#include <yt/yt/client/table_client/unversioned_row.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +struct IRowBatchReader + : public NChunkClient::IReadyEventReaderBase +{ + //! Attempts to read a batch of rows. + //! + //! If the returned row batch is empty, the rows are not immediately + //! available, and the client must invoke #GetReadyEvent and wait. + //! Returns nullptr when there are no more rows. + virtual NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options = {}) = 0; + + //! Returns the name table used for constructing rows. + virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IRowBatchReader) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/row_batch_writer.h b/yt/yt/client/api/row_batch_writer.h new file mode 100644 index 0000000000..db762da2df --- /dev/null +++ b/yt/yt/client/api/row_batch_writer.h @@ -0,0 +1,31 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/chunk_client/writer_base.h> + +#include <yt/yt/client/table_client/public.h> +#include <yt/yt/client/table_client/unversioned_row.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +struct IRowBatchWriter + : public NChunkClient::IWriterBase +{ + //! Attempts to write a bunch of rows. + //! + //! If false is returned, the next call to #Write must be made after + //! invoking #GetReadyEvent and waiting for it to become ready. + virtual bool Write(TRange<NTableClient::TUnversionedRow> rows) = 0; + + //! Returns the name table to be used for constructing rows. + virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IRowBatchWriter) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp index 6ac4eedc2a..7bf5238da3 100644 --- a/yt/yt/client/api/rpc_proxy/table_reader.cpp +++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp @@ -32,7 +32,7 @@ public: const std::vector<TString>& omittedInaccessibleColumns, TTableSchemaPtr schema, const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics) - : Underlying_ (std::move(underlying)) + : Underlying_(std::move(underlying)) , StartRowIndex_(startRowIndex) , TableSchema_(std::move(schema)) , OmittedInaccessibleColumns_(omittedInaccessibleColumns) @@ -65,7 +65,7 @@ public: return dataStatistics; } - TFuture<void> GetReadyEvent() override + TFuture<void> GetReadyEvent() const override { return ReadyEvent_; } diff --git a/yt/yt/client/api/table_reader.h b/yt/yt/client/api/table_reader.h index d2123db691..927f8dcb9f 100644 --- a/yt/yt/client/api/table_reader.h +++ b/yt/yt/client/api/table_reader.h @@ -1,20 +1,16 @@ #pragma once #include "public.h" - -#include <yt/yt/client/table_client/unversioned_reader.h> +#include "row_batch_reader.h" #include <yt/yt_proto/yt/client/chunk_client/proto/data_statistics.pb.h> -#include <yt/yt/core/actions/future.h> - namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// -// TODO(babenko): consider joining with NTableClient::IRowBatchReader struct ITableReader - : public virtual TRefCounted + : public IRowBatchReader { //! Returns the starting row index within the table. virtual i64 GetStartRowIndex() const = 0; @@ -25,17 +21,6 @@ struct ITableReader //! Returns various data statistics. virtual NChunkClient::NProto::TDataStatistics GetDataStatistics() const = 0; - //! Returns an asynchronous flag enabling to wait until data is available. - virtual TFuture<void> GetReadyEvent() = 0; - - //! Attempts to read a bunch of #rows. If true is returned but #rows is empty - //! the rows are not immediately available and the client must invoke - //! #GetReadyEvent and wait. False is returned if the end of table was reached. - virtual NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options = {}) = 0; - - //! Returns the name table used for constructing rows. - virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0; - //! Returns schema of the table. virtual const NTableClient::TTableSchemaPtr& GetTableSchema() const = 0; diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h index a9aff96356..fd22dbd1a0 100644 --- a/yt/yt/client/api/table_writer.h +++ b/yt/yt/client/api/table_writer.h @@ -1,31 +1,15 @@ #pragma once #include "public.h" - -#include <yt/yt/client/table_client/unversioned_row.h> - -#include <yt/yt/core/actions/future.h> +#include "row_batch_writer.h" namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// struct ITableWriter - : public virtual TRefCounted + : public IRowBatchWriter { - //! Attempts to write a bunch of #rows. If false is returned then the rows - //! are not accepted and the client must invoke #GetReadyEvent and wait. - virtual bool Write(TRange<NTableClient::TUnversionedRow> rows) = 0; - - //! Returns an asynchronous flag enabling to wait until data is written. - virtual TFuture<void> GetReadyEvent() = 0; - - //! Closes the writer. Must be the last call to the writer. - virtual TFuture<void> Close() = 0; - - //! Returns the name table to be used for constructing rows. - virtual const NTableClient::TNameTablePtr& GetNameTable() const = 0; - //! Returns the schema to be used for constructing rows. virtual const NTableClient::TTableSchemaPtr& GetSchema() const = 0; }; diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp index e7b7e00c29..0e78c2f861 100644 --- a/yt/yt/client/table_client/adapters.cpp +++ b/yt/yt/client/table_client/adapters.cpp @@ -4,6 +4,8 @@ #include <yt/yt/client/api/table_writer.h> +#include <yt/yt/client/table_client/schema.h> + #include <yt/yt/core/concurrency/scheduler.h> #include <yt/yt/core/concurrency/throughput_throttler.h> #include <yt/yt/core/concurrency/periodic_yielder.h> @@ -13,6 +15,9 @@ namespace NYT::NTableClient { using namespace NApi; using namespace NConcurrency; using namespace NCrypto; +using namespace NFormats; + +using NProfiling::TWallTimer; //////////////////////////////////////////////////////////////////////////////// @@ -21,14 +26,14 @@ const NLogging::TLogger Logger("TableClientAdapters"); //////////////////////////////////////////////////////////////////////////////// class TApiFromSchemalessWriterAdapter - : public NApi::ITableWriter + : public ITableWriter { public: explicit TApiFromSchemalessWriterAdapter(IUnversionedWriterPtr underlyingWriter) : UnderlyingWriter_(std::move(underlyingWriter)) { } - bool Write(TRange<NTableClient::TUnversionedRow> rows) override + bool Write(TRange<TUnversionedRow> rows) override { return UnderlyingWriter_->Write(rows); } @@ -43,12 +48,12 @@ public: return UnderlyingWriter_->Close(); } - const NTableClient::TNameTablePtr& GetNameTable() const override + const TNameTablePtr& GetNameTable() const override { return UnderlyingWriter_->GetNameTable(); } - const NTableClient::TTableSchemaPtr& GetSchema() const override + const TTableSchemaPtr& GetSchema() const override { return UnderlyingWriter_->GetSchema(); } @@ -57,7 +62,7 @@ private: const IUnversionedWriterPtr UnderlyingWriter_; }; -NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter( +ITableWriterPtr CreateApiFromSchemalessWriterAdapter( IUnversionedWriterPtr underlyingWriter) { return New<TApiFromSchemalessWriterAdapter>(std::move(underlyingWriter)); @@ -69,11 +74,14 @@ class TSchemalessApiFromWriterAdapter : public IUnversionedWriter { public: - explicit TSchemalessApiFromWriterAdapter(NApi::ITableWriterPtr underlyingWriter) + TSchemalessApiFromWriterAdapter( + IRowBatchWriterPtr underlyingWriter, + TTableSchemaPtr schema) : UnderlyingWriter_(std::move(underlyingWriter)) + , Schema_(std::move(schema)) { } - bool Write(TRange<NTableClient::TUnversionedRow> rows) override + bool Write(TRange<TUnversionedRow> rows) override { return UnderlyingWriter_->Write(rows); } @@ -88,14 +96,14 @@ public: return UnderlyingWriter_->Close(); } - const NTableClient::TNameTablePtr& GetNameTable() const override + const TNameTablePtr& GetNameTable() const override { return UnderlyingWriter_->GetNameTable(); } - const NTableClient::TTableSchemaPtr& GetSchema() const override + const TTableSchemaPtr& GetSchema() const override { - return UnderlyingWriter_->GetSchema(); + return Schema_; } std::optional<TMD5Hash> GetDigest() const override @@ -104,19 +112,26 @@ public: } private: - const NApi::ITableWriterPtr UnderlyingWriter_; + const IRowBatchWriterPtr UnderlyingWriter_; + const TTableSchemaPtr Schema_; }; IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter( - NApi::ITableWriterPtr underlyingWriter) + IRowBatchWriterPtr underlyingWriter) +{ + return New<TSchemalessApiFromWriterAdapter>(std::move(underlyingWriter), New<TTableSchema>()); +} + +IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter( + ITableWriterPtr underlyingWriter) { - return New<TSchemalessApiFromWriterAdapter>(std::move(underlyingWriter)); + return New<TSchemalessApiFromWriterAdapter>(underlyingWriter, underlyingWriter->GetSchema()); } //////////////////////////////////////////////////////////////////////////////// void PipeReaderToWriter( - const ITableReaderPtr& reader, + const IRowBatchReaderPtr& reader, const IUnversionedRowsetWriterPtr& writer, const TPipeReaderToWriterOptions& options) { @@ -179,78 +194,51 @@ void PipeReaderToWriter( } void PipeReaderToWriterByBatches( - const ITableReaderPtr& reader, - const NFormats::ISchemalessFormatWriterPtr& writer, - const TRowBatchReadOptions& options, + const IRowBatchReaderPtr& reader, + const ISchemalessFormatWriterPtr& writer, + TRowBatchReadOptions options, + TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater, TDuration pipeDelay) -try { - TPeriodicYielder yielder(TDuration::Seconds(1)); - - while (auto batch = reader->Read(options)) { - yielder.TryYield(); - - if (batch->IsEmpty()) { - WaitFor(reader->GetReadyEvent()) - .ThrowOnError(); - continue; - } - - if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) { - TDelayedExecutor::WaitForDuration(pipeDelay); - } +{ + try { + TPeriodicYielder yielder(TDuration::Seconds(1)); - if (!writer->WriteBatch(batch)) { - WaitFor(writer->GetReadyEvent()) - .ThrowOnError(); - } - } + while (auto batch = reader->Read(options)) { + yielder.TryYield(); - WaitFor(writer->Close()) - .ThrowOnError(); -} catch (const std::exception& ex) { - YT_LOG_ERROR(ex, "PipeReaderToWriterByBatches failed"); + if (batch->IsEmpty()) { + WaitFor(reader->GetReadyEvent()) + .ThrowOnError(); + continue; + } - THROW_ERROR_EXCEPTION(ex); -} + if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) { + TDelayedExecutor::WaitForDuration(pipeDelay); + } -void PipeReaderToAdaptiveWriterByBatches( - const ITableReaderPtr& reader, - const NFormats::ISchemalessFormatWriterPtr& writer, - TRowBatchReadOptions options, - TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater, - TDuration pipeDelay) -try { - TPeriodicYielder yielder(TDuration::Seconds(1)); + TWallTimer timer(/*start*/ false); - while (auto batch = reader->Read(options)) { - yielder.TryYield(); + if (optionsUpdater) { + timer.Start(); + } - if (batch->IsEmpty()) { - WaitFor(reader->GetReadyEvent()) - .ThrowOnError(); - continue; - } + if (!writer->WriteBatch(batch)) { + WaitFor(writer->GetReadyEvent()) + .ThrowOnError(); + } - if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) { - TDelayedExecutor::WaitForDuration(pipeDelay); + if (optionsUpdater) { + optionsUpdater(&options, timer.GetElapsedTime()); + } } - NProfiling::TWallTimer timer; - - if (!writer->WriteBatch(batch)) { - WaitFor(writer->GetReadyEvent()) - .ThrowOnError(); - } + WaitFor(writer->Close()) + .ThrowOnError(); + } catch (const std::exception& ex) { + YT_LOG_ERROR(ex, "Failed to transfer batches from reader to writer"); - optionsUpdater(&options, timer.GetElapsedTime()); + THROW_ERROR_EXCEPTION(ex); } - - WaitFor(writer->Close()) - .ThrowOnError(); -} catch (const std::exception& ex) { - YT_LOG_ERROR(ex, "PipeReaderToAdaptiveWriterByBatches failed"); - - THROW_ERROR_EXCEPTION(ex); } void PipeInputToOutput( @@ -278,7 +266,7 @@ void PipeInputToOutput( } void PipeInputToOutput( - const NConcurrency::IAsyncInputStreamPtr& input, + const IAsyncInputStreamPtr& input, IOutputStream* output, i64 bufferBlockSize) { @@ -300,7 +288,7 @@ void PipeInputToOutput( } void PipeInputToOutput( - const NConcurrency::IAsyncZeroCopyInputStreamPtr& input, + const IAsyncZeroCopyInputStreamPtr& input, IOutputStream* output) { while (true) { diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h index df629939e3..f90cbe101d 100644 --- a/yt/yt/client/table_client/adapters.h +++ b/yt/yt/client/table_client/adapters.h @@ -1,7 +1,6 @@ #pragma once #include "public.h" -#include "unversioned_writer.h" #include <yt/yt/client/api/table_reader.h> @@ -14,6 +13,9 @@ namespace NYT::NTableClient { //////////////////////////////////////////////////////////////////////////////// IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter( + NApi::IRowBatchWriterPtr underlyingWriter); + +IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter( NApi::ITableWriterPtr underlyingWriter); NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter( @@ -33,22 +35,16 @@ struct TPipeReaderToWriterOptions }; void PipeReaderToWriter( - const NApi::ITableReaderPtr& reader, + const NApi::IRowBatchReaderPtr& reader, const IUnversionedRowsetWriterPtr& writer, const TPipeReaderToWriterOptions& options); //! Parameter #pipeDelay is used only for testing. void PipeReaderToWriterByBatches( - const NApi::ITableReaderPtr& reader, - const NFormats::ISchemalessFormatWriterPtr& writer, - const TRowBatchReadOptions& options, - TDuration pipeDelay = TDuration::Zero()); - -void PipeReaderToAdaptiveWriterByBatches( - const NApi::ITableReaderPtr& reader, + const NApi::IRowBatchReaderPtr& reader, const NFormats::ISchemalessFormatWriterPtr& writer, TRowBatchReadOptions startingOptions, - TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater, + TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater = {}, TDuration pipeDelay = TDuration::Zero()); void PipeInputToOutput( diff --git a/yt/yt/client/unittests/mock/table_reader.h b/yt/yt/client/unittests/mock/table_reader.h index 7a8cf9825c..36ddda47ab 100644 --- a/yt/yt/client/unittests/mock/table_reader.h +++ b/yt/yt/client/unittests/mock/table_reader.h @@ -20,7 +20,7 @@ public: MOCK_METHOD(NChunkClient::NProto::TDataStatistics, GetDataStatistics, (), (const, override)); - MOCK_METHOD(TFuture<void>, GetReadyEvent, (), (override)); + MOCK_METHOD(TFuture<void>, GetReadyEvent, (), (const, override)); MOCK_METHOD(NTableClient::IUnversionedRowBatchPtr, Read, (const NTableClient::TRowBatchReadOptions& options), (override)); |