summaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/arrow_next/cpp/src/arrow/memory_pool.h
blob: 857a587e7b54791405b0f4a1f18c472a9a03ca0c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#pragma clang system_header
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>

#include "contrib/libs/apache/arrow_next/cpp/src/arrow/result.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_fwd.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/visibility.h"

namespace arrow20 {

namespace internal {

///////////////////////////////////////////////////////////////////////
// Helper tracking memory statistics

/// \brief Memory pool statistics
///
/// 64-byte aligned so that all atomic values are on the same cache line.
class alignas(64) MemoryPoolStats {
 private:
  // All atomics are updated according to Acquire-Release ordering.
  // https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering
  //
  // max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they are
  // monotonically increasing) which can allow some optimizations.
  std::atomic<int64_t> max_memory_{0};
  std::atomic<int64_t> bytes_allocated_{0};
  std::atomic<int64_t> total_allocated_bytes_{0};
  std::atomic<int64_t> num_allocs_{0};

 public:
  int64_t max_memory() const { return max_memory_.load(std::memory_order_acquire); }

  int64_t bytes_allocated() const {
    return bytes_allocated_.load(std::memory_order_acquire);
  }

  int64_t total_bytes_allocated() const {
    return total_allocated_bytes_.load(std::memory_order_acquire);
  }

  int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); }

  inline void DidAllocateBytes(int64_t size) {
    // Issue the load before everything else. max_memory_ is monotonically increasing,
    // so we can use a relaxed load before the read-modify-write.
    auto max_memory = max_memory_.load(std::memory_order_relaxed);
    const auto old_bytes_allocated =
        bytes_allocated_.fetch_add(size, std::memory_order_acq_rel);
    // Issue store operations on values that we don't depend on to proceed
    // with execution. When done, max_memory and old_bytes_allocated have
    // a higher chance of being available on CPU registers. This also has the
    // nice side-effect of putting 3 atomic stores close to each other in the
    // instruction stream.
    total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel);
    num_allocs_.fetch_add(1, std::memory_order_acq_rel);

    // If other threads are updating max_memory_ concurrently we leave the loop without
    // updating knowing that it already reached a value even higher than ours.
    const auto allocated = old_bytes_allocated + size;
    while (max_memory < allocated && !max_memory_.compare_exchange_weak(
                                         /*expected=*/max_memory, /*desired=*/allocated,
                                         std::memory_order_acq_rel)) {
    }
  }

  inline void DidReallocateBytes(int64_t old_size, int64_t new_size) {
    if (new_size > old_size) {
      DidAllocateBytes(new_size - old_size);
    } else {
      DidFreeBytes(old_size - new_size);
    }
  }

  inline void DidFreeBytes(int64_t size) {
    bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel);
  }
};

}  // namespace internal

/// Base class for memory allocation on the CPU.
///
/// Besides tracking the number of allocated bytes, the allocator also should
/// take care of the required 64-byte alignment.
class ARROW_EXPORT MemoryPool {
 public:
  virtual ~MemoryPool() = default;

  /// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool
  static std::unique_ptr<MemoryPool> CreateDefault();

  /// Allocate a new memory region of at least size bytes.
  ///
  /// The allocated region shall be 64-byte aligned.
  Status Allocate(int64_t size, uint8_t** out) {
    return Allocate(size, kDefaultBufferAlignment, out);
  }

  /// Allocate a new memory region of at least size bytes aligned to alignment.
  virtual Status Allocate(int64_t size, int64_t alignment, uint8_t** out) = 0;

  /// Resize an already allocated memory section.
  ///
  /// As by default most default allocators on a platform don't support aligned
  /// reallocation, this function can involve a copy of the underlying data.
  virtual Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
                            uint8_t** ptr) = 0;
  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
    return Reallocate(old_size, new_size, kDefaultBufferAlignment, ptr);
  }

  /// Free an allocated region.
  ///
  /// @param buffer Pointer to the start of the allocated memory region
  /// @param size Allocated size located at buffer. An allocator implementation
  ///   may use this for tracking the amount of allocated bytes as well as for
  ///   faster deallocation if supported by its backend.
  /// @param alignment The alignment of the allocation. Defaults to 64 bytes.
  virtual void Free(uint8_t* buffer, int64_t size, int64_t alignment) = 0;
  void Free(uint8_t* buffer, int64_t size) {
    Free(buffer, size, kDefaultBufferAlignment);
  }

  /// Return unused memory to the OS
  ///
  /// Only applies to allocators that hold onto unused memory.  This will be
  /// best effort, a memory pool may not implement this feature or may be
  /// unable to fulfill the request due to fragmentation.
  virtual void ReleaseUnused() {}

  /// Print statistics
  ///
  /// Print allocation statistics on stderr. The output format is
  /// implementation-specific. Not all memory pools implement this method.
  virtual void PrintStats() {}

  /// The number of bytes that were allocated and not yet free'd through
  /// this allocator.
  virtual int64_t bytes_allocated() const = 0;

  /// Return peak memory allocation in this memory pool
  ///
  /// \return Maximum bytes allocated. If not known (or not implemented),
  /// returns -1
  virtual int64_t max_memory() const;

  /// The number of bytes that were allocated.
  virtual int64_t total_bytes_allocated() const = 0;

  /// The number of allocations or reallocations that were requested.
  virtual int64_t num_allocations() const = 0;

  /// The name of the backend used by this MemoryPool (e.g. "system" or "jemalloc").
  virtual std::string backend_name() const = 0;

 protected:
  MemoryPool() = default;
};

class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
 public:
  explicit LoggingMemoryPool(MemoryPool* pool);
  ~LoggingMemoryPool() override = default;

  using MemoryPool::Allocate;
  using MemoryPool::Free;
  using MemoryPool::Reallocate;

  Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override;
  Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
                    uint8_t** ptr) override;
  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
  void ReleaseUnused() override;
  void PrintStats() override;

  int64_t bytes_allocated() const override;

  int64_t max_memory() const override;

  int64_t total_bytes_allocated() const override;

  int64_t num_allocations() const override;

  std::string backend_name() const override;

 private:
  MemoryPool* pool_;
};

/// Derived class for memory allocation.
///
/// Tracks the number of bytes and maximum memory allocated through its direct
/// calls. Actual allocation is delegated to MemoryPool class.
class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
 public:
  explicit ProxyMemoryPool(MemoryPool* pool);
  ~ProxyMemoryPool() override;

  using MemoryPool::Allocate;
  using MemoryPool::Free;
  using MemoryPool::Reallocate;

  Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override;
  Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
                    uint8_t** ptr) override;
  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
  void ReleaseUnused() override;
  void PrintStats() override;

  int64_t bytes_allocated() const override;

  int64_t max_memory() const override;

  int64_t total_bytes_allocated() const override;

  int64_t num_allocations() const override;

  std::string backend_name() const override;

 private:
  class ProxyMemoryPoolImpl;
  std::unique_ptr<ProxyMemoryPoolImpl> impl_;
};

/// \brief Return a process-wide memory pool based on the system allocator.
ARROW_EXPORT MemoryPool* system_memory_pool();

/// \brief Return a process-wide memory pool based on jemalloc.
///
/// May return NotImplemented if jemalloc is not available.
ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out);

/// \brief Set jemalloc memory page purging behavior for future-created arenas
/// to the indicated number of milliseconds. See dirty_decay_ms and
/// muzzy_decay_ms options in jemalloc for a description of what these do. The
/// default is configured to 1000 (1 second) which releases memory more
/// aggressively to the operating system than the jemalloc default of 10
/// seconds. If you set the value to 0, dirty / muzzy pages will be released
/// immediately rather than with a time decay, but this may reduce application
/// performance.
ARROW_EXPORT
Status jemalloc_set_decay_ms(int ms);

/// \brief Get basic statistics from jemalloc's mallctl.
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
ARROW_EXPORT
Result<int64_t> jemalloc_get_stat(const char* name);

/// \brief Reset the counter for peak bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
/// thread.allocated or thread.deallocated.
ARROW_EXPORT
Status jemalloc_peak_reset();

/// \brief Print summary statistics in human-readable form to stderr.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(const char* opts = "");

/// \brief Print summary statistics in human-readable form using a callback
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
                            const char* opts = "");

/// \brief Get summary statistics in human-readable form.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Result<std::string> jemalloc_stats_string(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
///
/// May return NotImplemented if mimalloc is not available.
ARROW_EXPORT Status mimalloc_memory_pool(MemoryPool** out);

/// \brief Return the names of the backends supported by this Arrow build.
ARROW_EXPORT std::vector<std::string> SupportedMemoryBackendNames();

}  // namespace arrow20