diff options
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 56 |
1 files changed, 54 insertions, 2 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 690580285a..18c7a3ad5d 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -9,6 +9,7 @@ #include "init.h" #include "lock.h" #include "operation.h" +#include "partition_reader.h" #include "retryful_writer.h" #include "transaction.h" #include "transaction_pinger.h" @@ -420,6 +421,14 @@ TRawTableReaderPtr TClientBase::CreateRawReader( return CreateClientReader(path, format, options).Get(); } +TRawTableReaderPtr TClientBase::CreateRawTablePartitionReader( + const TString& cookie, + const TFormat& format, + const TTablePartitionReaderOptions& options) +{ + return NDetail::CreateTablePartitionReader(RawClient_, ClientRetryPolicy_->CreatePolicyForReaderRequest(), cookie, format, options); +} + TRawTableWriterPtr TClientBase::CreateRawWriter( const TRichYPath& path, const TFormat& format, @@ -883,6 +892,45 @@ THolder<TClientWriter> TClientBase::CreateClientWriter( std::move(skiffOptions)); } +::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options) +{ + auto format = TFormat::YsonBinary(); + ApplyFormatHints<TNode>(&format, options.FormatHints_); + + return MakeIntrusive<TNodeTableReader>(CreateRawTablePartitionReader(cookie, format, options)); +} + +::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options, + const Message* prototype) +{ + auto descriptors = TVector<const ::google::protobuf::Descriptor*>{ + prototype->GetDescriptor(), + }; + auto format = TFormat::Protobuf(descriptors, Context_.Config->ProtobufFormatWithDescriptors); + return MakeIntrusive<TLenvalProtoTableReader>( + CreateRawTablePartitionReader(cookie, format, options), + std::move(descriptors)); +} + +::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowTablePartitionReader( + const TString& cookie, + const TTablePartitionReaderOptions& options, + const ISkiffRowSkipperPtr& skipper, + const NSkiff::TSkiffSchemaPtr& schema) +{ + auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true); + auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions); + return new TSkiffRowTableReader( + CreateRawTablePartitionReader(cookie, NYT::NDetail::CreateSkiffFormat(resultSchema), options), + resultSchema, + {skipper}, + std::move(skiffOptions)); +} + ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter( const TRichYPath& path, const TTableWriterOptions& options) { @@ -1561,13 +1609,17 @@ TClientContext CreateClientContext( context.Config = options.Config_ ? options.Config_ : TConfig::Get(); context.TvmOnly = options.TvmOnly_; context.ProxyAddress = options.ProxyAddress_; - context.ProxyUnixDomainSocket = options.ProxyUnixDomainSocket_; + context.UseProxyUnixDomainSocket = options.UseProxyUnixDomainSocket_; if (options.UseTLS_) { context.UseTLS = *options.UseTLS_; } - SetupClusterContext(context, serverName); + if (!options.UseProxyUnixDomainSocket_) { + SetupClusterContext(context, serverName); + } else { + context.ServerName = serverName; + } if (context.Config->HttpProxyRole && context.Config->Hosts == DefaultHosts) { context.Config->Hosts = "hosts?role=" + context.Config->HttpProxyRole; |