aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp56
1 files changed, 54 insertions, 2 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 690580285a..18c7a3ad5d 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -9,6 +9,7 @@
#include "init.h"
#include "lock.h"
#include "operation.h"
+#include "partition_reader.h"
#include "retryful_writer.h"
#include "transaction.h"
#include "transaction_pinger.h"
@@ -420,6 +421,14 @@ TRawTableReaderPtr TClientBase::CreateRawReader(
return CreateClientReader(path, format, options).Get();
}
+TRawTableReaderPtr TClientBase::CreateRawTablePartitionReader(
+ const TString& cookie,
+ const TFormat& format,
+ const TTablePartitionReaderOptions& options)
+{
+ return NDetail::CreateTablePartitionReader(RawClient_, ClientRetryPolicy_->CreatePolicyForReaderRequest(), cookie, format, options);
+}
+
TRawTableWriterPtr TClientBase::CreateRawWriter(
const TRichYPath& path,
const TFormat& format,
@@ -883,6 +892,45 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
std::move(skiffOptions));
}
+::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeTablePartitionReader(
+ const TString& cookie,
+ const TTablePartitionReaderOptions& options)
+{
+ auto format = TFormat::YsonBinary();
+ ApplyFormatHints<TNode>(&format, options.FormatHints_);
+
+ return MakeIntrusive<TNodeTableReader>(CreateRawTablePartitionReader(cookie, format, options));
+}
+
+::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoTablePartitionReader(
+ const TString& cookie,
+ const TTablePartitionReaderOptions& options,
+ const Message* prototype)
+{
+ auto descriptors = TVector<const ::google::protobuf::Descriptor*>{
+ prototype->GetDescriptor(),
+ };
+ auto format = TFormat::Protobuf(descriptors, Context_.Config->ProtobufFormatWithDescriptors);
+ return MakeIntrusive<TLenvalProtoTableReader>(
+ CreateRawTablePartitionReader(cookie, format, options),
+ std::move(descriptors));
+}
+
+::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowTablePartitionReader(
+ const TString& cookie,
+ const TTablePartitionReaderOptions& options,
+ const ISkiffRowSkipperPtr& skipper,
+ const NSkiff::TSkiffSchemaPtr& schema)
+{
+ auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
+ auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
+ return new TSkiffRowTableReader(
+ CreateRawTablePartitionReader(cookie, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
+ resultSchema,
+ {skipper},
+ std::move(skiffOptions));
+}
+
::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
const TRichYPath& path, const TTableWriterOptions& options)
{
@@ -1561,13 +1609,17 @@ TClientContext CreateClientContext(
context.Config = options.Config_ ? options.Config_ : TConfig::Get();
context.TvmOnly = options.TvmOnly_;
context.ProxyAddress = options.ProxyAddress_;
- context.ProxyUnixDomainSocket = options.ProxyUnixDomainSocket_;
+ context.UseProxyUnixDomainSocket = options.UseProxyUnixDomainSocket_;
if (options.UseTLS_) {
context.UseTLS = *options.UseTLS_;
}
- SetupClusterContext(context, serverName);
+ if (!options.UseProxyUnixDomainSocket_) {
+ SetupClusterContext(context, serverName);
+ } else {
+ context.ServerName = serverName;
+ }
if (context.Config->HttpProxyRole && context.Config->Hosts == DefaultHosts) {
context.Config->Hosts = "hosts?role=" + context.Config->HttpProxyRole;