diff options
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index fccf3501a90..18c7a3ad5d4 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -9,6 +9,7 @@ #include "init.h" #include "lock.h" #include "operation.h" +#include "partition_reader.h" #include "retryful_writer.h" #include "transaction.h" #include "transaction_pinger.h" @@ -420,6 +421,14 @@ TRawTableReaderPtr TClientBase::CreateRawReader( return CreateClientReader(path, format, options).Get(); } +TRawTableReaderPtr TClientBase::CreateRawTablePartitionReader( + const TString& cookie, + const TFormat& format, + const TTablePartitionReaderOptions& options) +{ + return NDetail::CreateTablePartitionReader(RawClient_, ClientRetryPolicy_->CreatePolicyForReaderRequest(), cookie, format, options); +} + TRawTableWriterPtr TClientBase::CreateRawWriter( const TRichYPath& path, const TFormat& format, @@ -883,6 +892,45 @@ THolder<TClientWriter> TClientBase::CreateClientWriter( std::move(skiffOptions)); } +::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options) +{ + auto format = TFormat::YsonBinary(); + ApplyFormatHints<TNode>(&format, options.FormatHints_); + + return MakeIntrusive<TNodeTableReader>(CreateRawTablePartitionReader(cookie, format, options)); +} + +::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options, + const Message* prototype) +{ + auto descriptors = TVector<const ::google::protobuf::Descriptor*>{ + prototype->GetDescriptor(), + }; + auto format = TFormat::Protobuf(descriptors, Context_.Config->ProtobufFormatWithDescriptors); + return MakeIntrusive<TLenvalProtoTableReader>( + CreateRawTablePartitionReader(cookie, format, options), + std::move(descriptors)); +} + +::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options, + const ISkiffRowSkipperPtr& skipper, + const NSkiff::TSkiffSchemaPtr& schema) +{ + auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true); + auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions); + return new TSkiffRowTableReader( + CreateRawTablePartitionReader(cookie, NYT::NDetail::CreateSkiffFormat(resultSchema), options), + resultSchema, + {skipper}, + std::move(skiffOptions)); +} + ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter( const TRichYPath& path, const TTableWriterOptions& options) { |
