1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
From 308af41748bc15ff794126226cb7bc4be14bf084 Mon Sep 17 00:00:00 2001
From: Aleksandr Khoroshilov <hor911@ydb.tech>
Date: Mon, 6 Mar 2023 02:11:54 +0300
Subject: [PATCH 1/2] Split arrow::FileReader::ReadRowGroups() for flexible
async IO
---
cpp/src/parquet/arrow/reader.cc | 30 +++++++++++++++++++++++++++---
cpp/src/parquet/arrow/reader.h | 7 +++++++
2 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 5b39de93d9ccf..11e543f935b61 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -311,6 +311,13 @@ class FileReaderImpl : public FileReader {
return ReadTable(Iota(reader_->metadata()->num_columns()), table);
}
+ Status WillNeedRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) override;
+
+ Status DecodeRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) override;
+
Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;
@@ -1216,9 +1223,8 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
return Status::OK();
}
-Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
- const std::vector<int>& column_indices,
- std::shared_ptr<Table>* out) {
+Status FileReaderImpl::WillNeedRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
@@ -1229,6 +1235,24 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
+ return Status::OK();
+}
+
+Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) {
+ RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
+
+ auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
+ /*cpu_executor=*/nullptr);
+ ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
+ return Status::OK();
+}
+
+Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<Table>* out) {
+ RETURN_NOT_OK(WillNeedRowGroups(row_groups, column_indices));
auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
/*cpu_executor=*/nullptr);
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 5dff35e887ef0..fbabeba7c764f 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -249,6 +249,13 @@ class PARQUET_EXPORT FileReader {
virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0;
+ virtual ::arrow::Status WillNeedRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) = 0;
+
+ virtual ::arrow::Status DecodeRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) = 0;
+
virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) = 0;
From a82d7512faa11b01ff29fb724dd115f62e223aed Mon Sep 17 00:00:00 2001
From: Aleksandr Khoroshilov <hor911@ydb.tech>
Date: Mon, 6 Mar 2023 03:16:53 +0300
Subject: [PATCH 2/2] Clang formatting
---
cpp/src/parquet/arrow/reader.cc | 6 +++---
cpp/src/parquet/arrow/reader.h | 2 +-
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 11e543f935b61..d361319f3c96a 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -312,7 +312,7 @@ class FileReaderImpl : public FileReader {
}
Status WillNeedRowGroups(const std::vector<int>& row_groups,
- const std::vector<int>& column_indices) override;
+ const std::vector<int>& column_indices) override;
Status DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
@@ -1239,8 +1239,8 @@ Status FileReaderImpl::WillNeedRowGroups(const std::vector<int>& row_groups,
}
Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
- const std::vector<int>& column_indices,
- std::shared_ptr<::arrow::Table>* out) {
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index fbabeba7c764f..33e8677ef7c15 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -250,7 +250,7 @@ class PARQUET_EXPORT FileReader {
virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0;
virtual ::arrow::Status WillNeedRowGroups(const std::vector<int>& row_groups,
- const std::vector<int>& column_indices) = 0;
+ const std::vector<int>& column_indices) = 0;
virtual ::arrow::Status DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
|