summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/partition_reader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/client/partition_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/client/partition_reader.cpp66
1 files changed, 66 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/partition_reader.cpp b/yt/cpp/mapreduce/client/partition_reader.cpp
new file mode 100644
index 00000000000..1610a087cc9
--- /dev/null
+++ b/yt/cpp/mapreduce/client/partition_reader.cpp
@@ -0,0 +1,66 @@
+#include "partition_reader.h"
+
+#include <yt/cpp/mapreduce/common/retry_request.h>
+
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TPartitionTableReader
+ : public TRawTableReader
+{
+public:
+ TPartitionTableReader(std::unique_ptr<IInputStream> input)
+ : Input_(std::move(input))
+ { }
+
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& /*rowIndex*/,
+ const std::exception_ptr& /*error*/) override
+ {
+ return false;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return Input_->Read(buf, len);
+ }
+
+private:
+ std::unique_ptr<IInputStream> Input_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRawTableReaderPtr CreateTablePartitionReader(
+ const IRawClientPtr& rawClient,
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TString& cookie,
+ const TMaybe<TFormat>& format,
+ const TTablePartitionReaderOptions& options)
+{
+
+ auto stream = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ retryPolicy,
+ [&] (TMutationId /*mutationId*/) {
+ return rawClient->ReadTablePartition(cookie, format, options);
+ }
+ );
+ return MakeIntrusive<TPartitionTableReader>(std::move(stream));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail