summaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/arrow_next/cpp/src/arrow/util/thread_pool.h
blob: 7548e44a1566533b9d73198a906e4ca1469ab32b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
#pragma clang system_header
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <memory>
#include <queue>
#include <type_traits>
#include <unordered_set>
#include <utility>

#include "contrib/libs/apache/arrow_next/cpp/src/arrow/result.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/cancel.h"
#include "contrib/libs/apache/arrow_next/src/arrow/util/config.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/functional.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/future.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/iterator.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/macros.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/visibility.h"

#if defined(_MSC_VER)
// Disable harmless warning for decorated name length limit
#  pragma warning(disable : 4503)
#endif

namespace arrow20 {

/// \brief Get the capacity of the global thread pool
///
/// Return the number of worker threads in the thread pool to which
/// Arrow dispatches various CPU-bound tasks.  This is an ideal number,
/// not necessarily the exact number of threads at a given point in time.
///
/// You can change this number using SetCpuThreadPoolCapacity().
ARROW_EXPORT int GetCpuThreadPoolCapacity();

/// \brief Set the capacity of the global thread pool
///
/// Set the number of worker threads int the thread pool to which
/// Arrow dispatches various CPU-bound tasks.
///
/// The current number is returned by GetCpuThreadPoolCapacity().
ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);

namespace internal {

// Hints about a task that may be used by an Executor.
// They are ignored by the provided ThreadPool implementation.
struct TaskHints {
  // The lower, the more urgent
  int32_t priority = 0;
  // The IO transfer size in bytes
  int64_t io_size = -1;
  // The approximate CPU cost in number of instructions
  int64_t cpu_cost = -1;
  // An application-specific ID
  int64_t external_id = -1;
};

class ARROW_EXPORT Executor {
 public:
  using StopCallback = internal::FnOnce<void(const Status&)>;

  virtual ~Executor();

  // Spawn a fire-and-forget task.
  template <typename Function>
  Status Spawn(Function&& func) {
    return SpawnReal(TaskHints{}, std::forward<Function>(func), StopToken::Unstoppable(),
                     StopCallback{});
  }
  template <typename Function>
  Status Spawn(Function&& func, StopToken stop_token) {
    return SpawnReal(TaskHints{}, std::forward<Function>(func), std::move(stop_token),
                     StopCallback{});
  }
  template <typename Function>
  Status Spawn(TaskHints hints, Function&& func) {
    return SpawnReal(hints, std::forward<Function>(func), StopToken::Unstoppable(),
                     StopCallback{});
  }
  template <typename Function>
  Status Spawn(TaskHints hints, Function&& func, StopToken stop_token) {
    return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
                     StopCallback{});
  }
  template <typename Function>
  Status Spawn(TaskHints hints, Function&& func, StopToken stop_token,
               StopCallback stop_callback) {
    return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
                     std::move(stop_callback));
  }

  // Transfers a future to this executor.  Any continuations added to the
  // returned future will run in this executor.  Otherwise they would run
  // on the same thread that called MarkFinished.
  //
  // This is necessary when (for example) an I/O task is completing a future.
  // The continuations of that future should run on the CPU thread pool keeping
  // CPU heavy work off the I/O thread pool.  So the I/O task should transfer
  // the future to the CPU executor before returning.
  //
  // By default this method will only transfer if the future is not already completed.  If
  // the future is already completed then any callback would be run synchronously and so
  // no transfer is typically necessary.  However, in cases where you want to force a
  // transfer (e.g. to help the scheduler break up units of work across multiple cores)
  // then you can override this behavior with `always_transfer`.
  template <typename T>
  Future<T> Transfer(Future<T> future) {
    return DoTransfer(std::move(future), false);
  }

  // Overload of Transfer which will always schedule callbacks on new threads even if the
  // future is finished when the callback is added.
  //
  // This can be useful in cases where you want to ensure parallelism
  template <typename T>
  Future<T> TransferAlways(Future<T> future) {
    return DoTransfer(std::move(future), true);
  }

  // Submit a callable and arguments for execution.  Return a future that
  // will return the callable's result value once.
  // The callable's arguments are copied before execution.
  template <typename Function, typename... Args,
            typename FutureType = typename ::arrow20::detail::ContinueFuture::ForSignature<
                Function && (Args && ...)>>
  Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&& func,
                            Args&&... args) {
    using ValueType = typename FutureType::ValueType;

    auto future = FutureType::Make();
    auto task = std::bind(::arrow20::detail::ContinueFuture{}, future,
                          std::forward<Function>(func), std::forward<Args>(args)...);
    struct {
      WeakFuture<ValueType> weak_fut;

      void operator()(const Status& st) {
        auto fut = weak_fut.get();
        if (fut.is_valid()) {
          fut.MarkFinished(st);
        }
      }
    } stop_callback{WeakFuture<ValueType>(future)};
    ARROW_RETURN_NOT_OK(SpawnReal(hints, std::move(task), std::move(stop_token),
                                  std::move(stop_callback)));

    return future;
  }

  template <typename Function, typename... Args,
            typename FutureType = typename ::arrow20::detail::ContinueFuture::ForSignature<
                Function && (Args && ...)>>
  Result<FutureType> Submit(StopToken stop_token, Function&& func, Args&&... args) {
    return Submit(TaskHints{}, stop_token, std::forward<Function>(func),
                  std::forward<Args>(args)...);
  }

  template <typename Function, typename... Args,
            typename FutureType = typename ::arrow20::detail::ContinueFuture::ForSignature<
                Function && (Args && ...)>>
  Result<FutureType> Submit(TaskHints hints, Function&& func, Args&&... args) {
    return Submit(std::move(hints), StopToken::Unstoppable(),
                  std::forward<Function>(func), std::forward<Args>(args)...);
  }

  template <typename Function, typename... Args,
            typename FutureType = typename ::arrow20::detail::ContinueFuture::ForSignature<
                Function && (Args && ...)>>
  Result<FutureType> Submit(Function&& func, Args&&... args) {
    return Submit(TaskHints{}, StopToken::Unstoppable(), std::forward<Function>(func),
                  std::forward<Args>(args)...);
  }

  // Return the level of parallelism (the number of tasks that may be executed
  // concurrently).  This may be an approximate number.
  virtual int GetCapacity() = 0;

  // Return true if the thread from which this function is called is owned by this
  // Executor. Returns false if this Executor does not support this property.
  virtual bool OwnsThisThread() { return false; }

  // Return true if this is the current executor being called
  // n.b. this defaults to just calling OwnsThisThread
  // unless the threadpool is disabled
  virtual bool IsCurrentExecutor() { return OwnsThisThread(); }

  /// \brief An interface to represent something with a custom destructor
  ///
  /// \see KeepAlive
  class ARROW_EXPORT Resource {
   public:
    virtual ~Resource() = default;
  };

  /// \brief Keep a resource alive until all executor threads have terminated
  ///
  /// Executors may have static storage duration.  In particular, the CPU and I/O
  /// executors are currently implemented this way.  These threads may access other
  /// objects with static storage duration such as the OpenTelemetry runtime context
  /// the default memory pool, or other static executors.
  ///
  /// The order in which these objects are destroyed is difficult to control.  In order
  /// to ensure those objects remain alive until all threads have finished those objects
  /// should be wrapped in a Resource object and passed into this method.  The given
  /// shared_ptr will be kept alive until all threads have finished their worker loops.
  virtual void KeepAlive(std::shared_ptr<Resource> resource);

 protected:
  ARROW_DISALLOW_COPY_AND_ASSIGN(Executor);

  Executor() = default;

  template <typename T, typename FT = Future<T>, typename FTSync = typename FT::SyncType>
  Future<T> DoTransfer(Future<T> future, bool always_transfer = false) {
    auto transferred = Future<T>::Make();
    if (always_transfer) {
      CallbackOptions callback_options = CallbackOptions::Defaults();
      callback_options.should_schedule = ShouldSchedule::Always;
      callback_options.executor = this;
      auto sync_callback = [transferred](const FTSync& result) mutable {
        transferred.MarkFinished(result);
      };
      future.AddCallback(sync_callback, callback_options);
      return transferred;
    }

    // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of
    // work by doing the test here.
    auto callback = [this, transferred](const FTSync& result) mutable {
      auto spawn_status =
          Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
      if (!spawn_status.ok()) {
        transferred.MarkFinished(spawn_status);
      }
    };
    auto callback_factory = [&callback]() { return callback; };
    if (future.TryAddCallback(callback_factory)) {
      return transferred;
    }
    // If the future is already finished and we aren't going to force spawn a thread
    // then we don't need to add another layer of callback and can return the original
    // future
    return future;
  }

  // Subclassing API
  virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                           StopCallback&&) = 0;
};

/// \brief An executor implementation that runs all tasks on a single thread using an
/// event loop.
///
/// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
/// fine but if one task needs to wait for another task it must be expressed as an
/// asynchronous continuation.
class ARROW_EXPORT SerialExecutor : public Executor {
 public:
  template <typename T = ::arrow20::internal::Empty>
  using TopLevelTask = internal::FnOnce<Future<T>(Executor*)>;

  ~SerialExecutor() override;

  int GetCapacity() override { return 1; };
  bool OwnsThisThread() override;
  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                   StopCallback&&) override;

  // Return the number of tasks either running or in the queue.
  int GetNumTasks();

  /// \brief Runs the TopLevelTask and any scheduled tasks
  ///
  /// The TopLevelTask (or one of the tasks it schedules) must either return an invalid
  /// status or call the finish signal. Failure to do this will result in a deadlock.  For
  /// this reason it is preferable (if possible) to use the helper methods (below)
  /// RunSynchronously/RunSerially which delegates the responsibility onto a Future
  /// producer's existing responsibility to always mark a future finished (which can
  /// someday be aided by ARROW-12207).
  template <typename T = internal::Empty, typename FT = Future<T>,
            typename FTSync = typename FT::SyncType>
  static FTSync RunInSerialExecutor(TopLevelTask<T> initial_task) {
    Future<T> fut = SerialExecutor().Run<T>(std::move(initial_task));
    return FutureToSync(fut);
  }

  /// \brief Transform an AsyncGenerator into an Iterator
  ///
  /// An event loop will be created and each call to Next will power the event loop with
  /// the calling thread until the next item is ready to be delivered.
  ///
  /// Note: The iterator's destructor will run until the given generator is fully
  /// exhausted. If you wish to abandon iteration before completion then the correct
  /// approach is to use a stop token to cause the generator to exhaust early.
  template <typename T>
  static Iterator<T> IterateGenerator(
      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
    auto serial_executor = std::unique_ptr<SerialExecutor>(new SerialExecutor());
    auto maybe_generator = std::move(initial_task)(serial_executor.get());
    if (!maybe_generator.ok()) {
      return MakeErrorIterator<T>(maybe_generator.status());
    }
    auto generator = maybe_generator.MoveValueUnsafe();
    struct SerialIterator {
      SerialIterator(std::unique_ptr<SerialExecutor> executor,
                     std::function<Future<T>()> generator)
          : executor(std::move(executor)), generator(std::move(generator)) {}
      ARROW_DISALLOW_COPY_AND_ASSIGN(SerialIterator);
      ARROW_DEFAULT_MOVE_AND_ASSIGN(SerialIterator);
      ~SerialIterator() {
        // A serial iterator must be consumed before it can be destroyed.  Allowing it to
        // do otherwise would lead to resource leakage.  There will likely be deadlocks at
        // this spot in the future but these will be the result of other bugs and not the
        // fact that we are forcing consumption here.

        // If a streaming API needs to support early abandonment then it should be done so
        // with a cancellation token and not simply discarding the iterator and expecting
        // the underlying work to clean up correctly.
        if (executor && !executor->IsFinished()) {
          while (true) {
            Result<T> maybe_next = Next();
            if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
              break;
            }
          }
        }
      }

      Result<T> Next() {
        executor->Unpause();
        // This call may lead to tasks being scheduled in the serial executor
        Future<T> next_fut = generator();
        next_fut.AddCallback([this](const Result<T>& res) {
          // If we're done iterating we should drain the rest of the tasks in the executor
          if (!res.ok() || IsIterationEnd(*res)) {
            executor->Finish();
            return;
          }
          // Otherwise we will break out immediately, leaving the remaining tasks for
          // the next call.
          executor->Pause();
        });
#ifdef ARROW_ENABLE_THREADING
        // future must run on this thread
        // Borrow this thread and run tasks until the future is finished
        executor->RunLoop();
#else
        next_fut.Wait();
#endif
        if (!next_fut.is_finished()) {
          // Not clear this is possible since RunLoop wouldn't generally exit
          // unless we paused/finished which would imply next_fut has been
          // finished.
          return Status::Invalid(
              "Serial executor terminated before next result computed");
        }
        // At this point we may still have tasks in the executor, that is ok.
        // We will run those tasks the next time through.
        return next_fut.result();
      }

      std::unique_ptr<SerialExecutor> executor;
      std::function<Future<T>()> generator;
    };
    return Iterator<T>(SerialIterator{std::move(serial_executor), std::move(generator)});
  }

#ifndef ARROW_ENABLE_THREADING
  // run a pending task from loop
  // returns true if any tasks were run in the last go round the loop (i.e. if it
  // returns false, all executors are waiting)
  static bool RunTasksOnAllExecutors();
  static SerialExecutor* GetCurrentExecutor();

  bool IsCurrentExecutor() override;

#endif

 protected:
  virtual void RunLoop();

  // State uses mutex
  struct State;
  std::shared_ptr<State> state_;

  SerialExecutor();

  // We mark the serial executor "finished" when there should be
  // no more tasks scheduled on it.  It's not strictly needed but
  // can help catch bugs where we are trying to use the executor
  // after we are done with it.
  void Finish();
  bool IsFinished();
  // We pause the executor when we are running an async generator
  // and we have received an item that we can deliver.
  void Pause();
  void Unpause();

  template <typename T, typename FTSync = typename Future<T>::SyncType>
  Future<T> Run(TopLevelTask<T> initial_task) {
    auto final_fut = std::move(initial_task)(this);
    final_fut.AddCallback([this](const FTSync&) { Finish(); });
    RunLoop();
    return final_fut;
  }

#ifndef ARROW_ENABLE_THREADING
  // we have to run tasks from all live executors
  // during RunLoop if we don't have threading
  static std::unordered_set<SerialExecutor*> all_executors;
  // a pointer to the last one called by the loop
  // so all tasks get spawned equally
  // on multiple calls to RunTasksOnAllExecutors
  static SerialExecutor* last_called_executor;
  // without threading we can't tell which executor called the
  // current process - so we set it in spawning the task
  static SerialExecutor* current_executor;
#endif  // ARROW_ENABLE_THREADING
};

#ifdef ARROW_ENABLE_THREADING

/// An Executor implementation spawning tasks in FIFO manner on a fixed-size
/// pool of worker threads.
///
/// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
/// fine but if one task needs to wait for another task it must be expressed as an
/// asynchronous continuation.
class ARROW_EXPORT ThreadPool : public Executor {
 public:
  // Construct a thread pool with the given number of worker threads
  static Result<std::shared_ptr<ThreadPool>> Make(int threads);

  // Like Make(), but takes care that the returned ThreadPool is compatible
  // with destruction late at process exit.
  static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);

  // Destroy thread pool; the pool will first be shut down
  ~ThreadPool() override;

  // Return the desired number of worker threads.
  // The actual number of workers may lag a bit before being adjusted to
  // match this value.
  int GetCapacity() override;

  // Return the number of tasks either running or in the queue.
  int GetNumTasks();

  bool OwnsThisThread() override;
  // Dynamically change the number of worker threads.
  //
  // This function always returns immediately.
  // If fewer threads are running than this number, new threads are spawned
  // on-demand when needed for task execution.
  // If more threads are running than this number, excess threads are reaped
  // as soon as possible.
  Status SetCapacity(int threads);

  // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
  // This is exposed as a static method to help with testing.
  static int DefaultCapacity();

  // Shutdown the pool.  Once the pool starts shutting down, new tasks
  // cannot be submitted anymore.
  // If "wait" is true, shutdown waits for all pending tasks to be finished.
  // If "wait" is false, workers are stopped as soon as currently executing
  // tasks are finished.
  Status Shutdown(bool wait = true);

  // Wait for the thread pool to become idle
  //
  // This is useful for sequencing tests
  void WaitForIdle();

  void KeepAlive(std::shared_ptr<Executor::Resource> resource) override;

  struct State;

 protected:
  FRIEND_TEST(TestThreadPool, SetCapacity);
  FRIEND_TEST(TestGlobalThreadPool, Capacity);
  ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();

  ThreadPool();

  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                   StopCallback&&) override;

  // Collect finished worker threads, making sure the OS threads have exited
  void CollectFinishedWorkersUnlocked();
  // Launch a given number of additional workers
  void LaunchWorkersUnlocked(int threads);
  // Get the current actual capacity
  int GetActualCapacity();

  static std::shared_ptr<ThreadPool> MakeCpuThreadPool();

  std::shared_ptr<State> sp_state_;
  State* state_;
  bool shutdown_on_destroy_;
};
#else  // ARROW_ENABLE_THREADING
// an executor implementation which pretends to be a thread pool but runs everything
// on the main thread using a static queue (shared between all thread pools, otherwise
// cross-threadpool dependencies will break everything)
class ARROW_EXPORT ThreadPool : public SerialExecutor {
 public:
  ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();

  static Result<std::shared_ptr<ThreadPool>> Make(int threads);

  // Like Make(), but takes care that the returned ThreadPool is compatible
  // with destruction late at process exit.
  static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);

  // Destroy thread pool; the pool will first be shut down
  ~ThreadPool() override;

  // Return the desired number of worker threads.
  // The actual number of workers may lag a bit before being adjusted to
  // match this value.
  int GetCapacity() override;

  virtual int GetActualCapacity();

  bool OwnsThisThread() override { return true; }

  // Dynamically change the number of worker threads.
  // without threading this is equal to the
  // number of tasks that can be running at once
  // (inside each other)
  Status SetCapacity(int threads);

  static int DefaultCapacity() { return 8; }

  // Shutdown the pool.  Once the pool starts shutting down, new tasks
  // cannot be submitted anymore.
  // If "wait" is true, shutdown waits for all pending tasks to be finished.
  // If "wait" is false, workers are stopped as soon as currently executing
  // tasks are finished.
  Status Shutdown(bool wait = true);

  // Wait for the thread pool to become idle
  //
  // This is useful for sequencing tests
  void WaitForIdle();

 protected:
  static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
  ThreadPool();
};

#endif  // ARROW_ENABLE_THREADING

// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();

/// \brief Potentially run an async operation serially (if use_threads is false)
/// \see RunSerially
///
/// If `use_threads` is true, the global CPU executor is used.
/// If `use_threads` is false, a temporary SerialExecutor is used.
/// `get_future` is called (from this thread) with the chosen executor and must
/// return a future that will eventually finish. This function returns once the
/// future has finished.
template <typename Fut, typename ValueType = typename Fut::ValueType>
typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
                                        bool use_threads) {
  if (use_threads) {
    auto fut = std::move(get_future)(GetCpuThreadPool());
    return FutureToSync(fut);
  } else {
    return SerialExecutor::RunInSerialExecutor<ValueType>(std::move(get_future));
  }
}

/// \brief Potentially iterate an async generator serially (if use_threads is false)
/// \see IterateGenerator
///
/// If `use_threads` is true, the global CPU executor will be used.  Each call to
///   the iterator will simply wait until the next item is available.  Tasks may run in
///   the background between calls.
///
/// If `use_threads` is false, the calling thread only will be used.  Each call to
///   the iterator will use the calling thread to do enough work to generate one item.
///   Tasks will be left in a queue until the next call and no work will be done between
///   calls.
template <typename T>
Iterator<T> IterateSynchronously(
    FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
  if (use_threads) {
    auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
    if (!maybe_gen.ok()) {
      return MakeErrorIterator<T>(maybe_gen.status());
    }
    return MakeGeneratorIterator(*maybe_gen);
  } else {
    return SerialExecutor::IterateGenerator(std::move(get_gen));
  }
}

}  // namespace internal
}  // namespace arrow20