aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/core/lib/promise/activity.h
blob: 552b8e48aa3c9044bb7926f7780d6bc6b4625e05 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
#define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H

#include <grpc/support/port_platform.h>

#include <stdint.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <util/generic/string.h>
#include <util/string/cast.h>
#include <utility>

#include "y_absl/base/thread_annotations.h"
#include "y_absl/status/status.h"
#include "y_absl/types/optional.h"

#include <grpc/support/log.h>

#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"

namespace grpc_core {

class Activity;

// WakeupMask is a bitfield representing which parts of an activity should be
// woken up.
using WakeupMask = uint16_t;

// A Wakeable object is used by queues to wake activities.
class Wakeable {
 public:
  // Wake up the underlying activity.
  // After calling, this Wakeable cannot be used again.
  // WakeupMask comes from the activity that created this Wakeable and specifies
  // the set of promises that should be awoken.
  virtual void Wakeup(WakeupMask wakeup_mask) = 0;
  // Drop this wakeable without waking up the underlying activity.
  virtual void Drop(WakeupMask wakeup_mask) = 0;

  // Return the underlying activity debug tag, or "<unknown>" if not available.
  virtual TString ActivityDebugTag(WakeupMask wakeup_mask) const = 0;

 protected:
  inline ~Wakeable() {}
};

namespace promise_detail {
struct Unwakeable final : public Wakeable {
  void Wakeup(WakeupMask) override {}
  void Drop(WakeupMask) override {}
  TString ActivityDebugTag(WakeupMask) const override;
};
static Unwakeable* unwakeable() {
  return NoDestructSingleton<Unwakeable>::Get();
}
}  // namespace promise_detail

// An owning reference to a Wakeable.
// This type is non-copyable but movable.
class Waker {
 public:
  Waker(Wakeable* wakeable, WakeupMask wakeup_mask)
      : wakeable_and_arg_{wakeable, wakeup_mask} {}
  Waker() : Waker(promise_detail::unwakeable(), 0) {}
  ~Waker() { wakeable_and_arg_.Drop(); }
  Waker(const Waker&) = delete;
  Waker& operator=(const Waker&) = delete;
  Waker(Waker&& other) noexcept : wakeable_and_arg_(other.Take()) {}
  Waker& operator=(Waker&& other) noexcept {
    std::swap(wakeable_and_arg_, other.wakeable_and_arg_);
    return *this;
  }

  // Wake the underlying activity.
  void Wakeup() { Take().Wakeup(); }

  template <typename H>
  friend H AbslHashValue(H h, const Waker& w) {
    return H::combine(H::combine(std::move(h), w.wakeable_and_arg_.wakeable),
                      w.wakeable_and_arg_.wakeup_mask);
  }

  bool operator==(const Waker& other) const noexcept {
    return wakeable_and_arg_ == other.wakeable_and_arg_;
  }

  bool operator!=(const Waker& other) const noexcept {
    return !operator==(other);
  }

  TString ActivityDebugTag() {
    return wakeable_and_arg_.ActivityDebugTag();
  }

  // This is for tests to assert that a waker is occupied or not.
  bool is_unwakeable() const {
    return wakeable_and_arg_.wakeable == promise_detail::unwakeable();
  }

 private:
  struct WakeableAndArg {
    Wakeable* wakeable;
    WakeupMask wakeup_mask;

    void Wakeup() { wakeable->Wakeup(wakeup_mask); }
    void Drop() { wakeable->Drop(wakeup_mask); }
    TString ActivityDebugTag() const {
      return wakeable == nullptr ? "<unknown>"
                                 : wakeable->ActivityDebugTag(wakeup_mask);
    }
    bool operator==(const WakeableAndArg& other) const noexcept {
      return wakeable == other.wakeable && wakeup_mask == other.wakeup_mask;
    }
  };

  WakeableAndArg Take() {
    return std::exchange(wakeable_and_arg_, {promise_detail::unwakeable(), 0});
  }

  WakeableAndArg wakeable_and_arg_;
};

// Helper type to track wakeups between objects in the same activity.
// Can be fairly fast as no ref counting or locking needs to occur.
class IntraActivityWaiter {
 public:
  // Register for wakeup, return Pending(). If state is not ready to proceed,
  // Promises should bottom out here.
  Pending pending();
  // Wake the activity
  void Wake();

  TString DebugString() const;

 private:
  WakeupMask wakeups_ = 0;
};

// An Activity tracks execution of a single promise.
// It executes the promise under a mutex.
// When the promise stalls, it registers the containing activity to be woken up
// later.
// The activity takes a callback, which will be called exactly once with the
// result of execution.
// Activity execution may be cancelled by simply deleting the activity. In such
// a case, if execution had not already finished, the done callback would be
// called with y_absl::CancelledError().
class Activity : public Orphanable {
 public:
  // Force wakeup from the outside.
  // This should be rarely needed, and usages should be accompanied with a note
  // on why it's not possible to wakeup with a Waker object.
  // Nevertheless, it's sometimes useful for integrations with Activity to force
  // an Activity to repoll.
  void ForceWakeup() { MakeOwningWaker().Wakeup(); }

  // Force the current activity to immediately repoll if it doesn't complete.
  virtual void ForceImmediateRepoll(WakeupMask mask) = 0;
  // Legacy version of ForceImmediateRepoll() that uses the current participant.
  // Will go away once Party gets merged with Activity. New usage is banned.
  void ForceImmediateRepoll() { ForceImmediateRepoll(CurrentParticipant()); }

  // Return the current part of the activity as a bitmask
  virtual WakeupMask CurrentParticipant() const { return 1; }

  // Return the current activity.
  // Additionally:
  // - assert that there is a current activity (and catch bugs if there's not)
  // - indicate to thread safety analysis that the current activity is indeed
  //   locked
  // - back up that assertation with a runtime check in debug builds (it's
  //   prohibitively expensive in non-debug builds)
  static Activity* current() { return g_current_activity_; }

  // Produce an activity-owning Waker. The produced waker will keep the activity
  // alive until it's awoken or dropped.
  virtual Waker MakeOwningWaker() = 0;

  // Produce a non-owning Waker. The waker will own a small heap allocated weak
  // pointer to this activity. This is more suitable for wakeups that may not be
  // delivered until long after the activity should be destroyed.
  virtual Waker MakeNonOwningWaker() = 0;

  // Some descriptive text to add to log messages to identify this activity.
  virtual TString DebugTag() const;

 protected:
  // Check if this activity is the current activity executing on the current
  // thread.
  bool is_current() const { return this == g_current_activity_; }
  // Check if there is an activity executing on the current thread.
  static bool have_current() { return g_current_activity_ != nullptr; }
  // Set the current activity at construction, clean it up at destruction.
  class ScopedActivity {
   public:
    explicit ScopedActivity(Activity* activity)
        : prior_activity_(g_current_activity_) {
      g_current_activity_ = activity;
    }
    ~ScopedActivity() { g_current_activity_ = prior_activity_; }
    ScopedActivity(const ScopedActivity&) = delete;
    ScopedActivity& operator=(const ScopedActivity&) = delete;

   private:
    Activity* const prior_activity_;
  };

 private:
  // Set during RunLoop to the Activity that's executing.
  // Being set implies that mu_ is held.
  static thread_local Activity* g_current_activity_;
};

// Owned pointer to one Activity.
using ActivityPtr = OrphanablePtr<Activity>;

namespace promise_detail {

template <typename Context>
class ContextHolder {
 public:
  using ContextType = Context;

  explicit ContextHolder(Context value) : value_(std::move(value)) {}
  Context* GetContext() { return &value_; }

 private:
  Context value_;
};

template <typename Context>
class ContextHolder<Context*> {
 public:
  using ContextType = Context;

  explicit ContextHolder(Context* value) : value_(value) {}
  Context* GetContext() { return value_; }

 private:
  Context* value_;
};

template <typename Context, typename Deleter>
class ContextHolder<std::unique_ptr<Context, Deleter>> {
 public:
  using ContextType = Context;

  explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
      : value_(std::move(value)) {}
  Context* GetContext() { return value_.get(); }

 private:
  std::unique_ptr<Context, Deleter> value_;
};

template <typename HeldContext>
using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;

template <typename... Contexts>
class ActivityContexts : public ContextHolder<Contexts>... {
 public:
  explicit ActivityContexts(Contexts&&... contexts)
      : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}

  class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
   public:
    explicit ScopedContext(ActivityContexts* contexts)
        : Context<ContextTypeFromHeld<Contexts>>(
              static_cast<ContextHolder<Contexts>*>(contexts)
                  ->GetContext())... {
      // Silence `unused-but-set-parameter` in case of Contexts = {}
      (void)contexts;
    }
  };
};

// A free standing activity: an activity that owns its own synchronization and
// memory.
// The alternative is an activity that's somehow tied into another system, for
// instance the type seen in promise_based_filter.h as we're transitioning from
// the old filter stack to the new system.
// FreestandingActivity is-a Wakeable, but needs to increment a refcount before
// returning that Wakeable interface. Additionally, we want to keep
// FreestandingActivity as small as is possible, since it will be used
// everywhere. So we use inheritance to provide the Wakeable interface: this
// makes it zero sized, and we make the inheritance private to prevent
// accidental casting.
class FreestandingActivity : public Activity, private Wakeable {
 public:
  Waker MakeOwningWaker() final {
    Ref();
    return Waker(this, 0);
  }
  Waker MakeNonOwningWaker() final;

  void Orphan() final {
    Cancel();
    Unref();
  }

  void ForceImmediateRepoll(WakeupMask) final {
    mu_.AssertHeld();
    SetActionDuringRun(ActionDuringRun::kWakeup);
  }

 protected:
  // Action received during a run, in priority order.
  // If more than one action is received during a run, we use max() to resolve
  // which one to report (so Cancel overrides Wakeup).
  enum class ActionDuringRun : uint8_t {
    kNone,    // No action occured during run.
    kWakeup,  // A wakeup occured during run.
    kCancel,  // Cancel was called during run.
  };

  inline ~FreestandingActivity() override {
    if (handle_) {
      DropHandle();
    }
  }

  // Check if we got an internal wakeup since the last time this function was
  // called.
  ActionDuringRun GotActionDuringRun() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    return std::exchange(action_during_run_, ActionDuringRun::kNone);
  }

  // Implementors of Wakeable::Wakeup should call this after the wakeup has
  // completed.
  void WakeupComplete() { Unref(); }

  // Set the action that occured during this run.
  // We use max to combine actions so that cancellation overrides wakeups.
  void SetActionDuringRun(ActionDuringRun action)
      Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    action_during_run_ = std::max(action_during_run_, action);
  }

  Mutex* mu() Y_ABSL_LOCK_RETURNED(mu_) { return &mu_; }

  TString ActivityDebugTag(WakeupMask) const override { return DebugTag(); }

 private:
  class Handle;

  // Cancel execution of the underlying promise.
  virtual void Cancel() = 0;

  void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
  void Unref() {
    if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
      delete this;
    }
  }

  // Return a Handle instance with a ref so that it can be stored waiting for
  // some wakeup.
  Handle* RefHandle() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
  // If our refcount is non-zero, ref and return true.
  // Otherwise, return false.
  bool RefIfNonzero();
  // Drop the (proved existing) wait handle.
  void DropHandle() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  // All promise execution occurs under this mutex.
  Mutex mu_;

  // Current refcount.
  std::atomic<uint32_t> refs_{1};
  // If wakeup is called during Promise polling, we set this to Wakeup and
  // repoll. If cancel is called during Promise polling, we set this to Cancel
  // and cancel at the end of polling.
  ActionDuringRun action_during_run_ Y_ABSL_GUARDED_BY(mu_) =
      ActionDuringRun::kNone;
  // Handle for long waits. Allows a very small weak pointer type object to
  // queue for wakeups while Activity may be deleted earlier.
  Handle* handle_ Y_ABSL_GUARDED_BY(mu_) = nullptr;
};

// Implementation details for an Activity of an arbitrary type of promise.
// There should exist an inner template class `BoundScheduler` that provides
// the following interface:
// struct WakeupScheduler {
//   template <typename ActivityType>
//   class BoundScheduler {
//    public:
//     BoundScheduler(WakeupScheduler);
//     void ScheduleWakeup();
//   };
// };
// The ScheduleWakeup function should arrange that
// static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
// earliest opportunity.
// It can assume that activity will remain live until RunScheduledWakeup() is
// invoked, and that a given activity will not be concurrently scheduled again
// until its RunScheduledWakeup() has been invoked.
// We use private inheritance here as a way of getting private members for each
// of the contexts.
// TODO(ctiller): We can probably reconsider the private inheritance here
// when we move away from C++11 and have more powerful template features.
template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
class PromiseActivity final
    : public FreestandingActivity,
      public WakeupScheduler::template BoundScheduler<
          PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
      private ActivityContexts<Contexts...> {
 public:
  using Factory = OncePromiseFactory<void, F>;
  using ResultType = typename Factory::Promise::Result;

  PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
                  OnDone on_done, Contexts&&... contexts)
      : FreestandingActivity(),
        WakeupScheduler::template BoundScheduler<PromiseActivity>(
            std::move(wakeup_scheduler)),
        ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
        on_done_(std::move(on_done)) {
    // Lock, construct an initial promise from the factory, and step it.
    // This may hit a waiter, which could expose our this pointer to other
    // threads, meaning we do need to hold this mutex even though we're still
    // constructing.
    mu()->Lock();
    auto status = Start(Factory(std::move(promise_factory)));
    mu()->Unlock();
    // We may complete immediately.
    if (status.has_value()) {
      on_done_(std::move(*status));
    }
  }

  ~PromiseActivity() override {
    // We shouldn't destruct without calling Cancel() first, and that must get
    // us to be done_, so we assume that and have no logic to destruct the
    // promise here.
    GPR_ASSERT(done_);
  }

  void RunScheduledWakeup() {
    GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
    Step();
    WakeupComplete();
  }

 private:
  using typename ActivityContexts<Contexts...>::ScopedContext;

  void Cancel() final {
    if (Activity::is_current()) {
      mu()->AssertHeld();
      SetActionDuringRun(ActionDuringRun::kCancel);
      return;
    }
    bool was_done;
    {
      MutexLock lock(mu());
      // Check if we were done, and flag done.
      was_done = done_;
      if (!done_) {
        ScopedActivity scoped_activity(this);
        ScopedContext contexts(this);
        MarkDone();
      }
    }
    // If we were not done, then call the on_done callback.
    if (!was_done) {
      on_done_(y_absl::CancelledError());
    }
  }

  // Wakeup this activity. Arrange to poll the activity again at a convenient
  // time: this could be inline if it's deemed safe, or it could be by passing
  // the activity to an external threadpool to run. If the activity is already
  // running on this thread, a note is taken of such and the activity is
  // repolled if it doesn't complete.
  void Wakeup(WakeupMask) final {
    // If there is an active activity, but hey it's us, flag that and we'll loop
    // in RunLoop (that's calling from above here!).
    if (Activity::is_current()) {
      mu()->AssertHeld();
      SetActionDuringRun(ActionDuringRun::kWakeup);
      WakeupComplete();
      return;
    }
    if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
      // Can't safely run, so ask to run later.
      this->ScheduleWakeup();
    } else {
      // Already a wakeup scheduled for later, drop ref.
      WakeupComplete();
    }
  }

  // Drop a wakeup
  void Drop(WakeupMask) final { this->WakeupComplete(); }

  // Notification that we're no longer executing - it's ok to destruct the
  // promise.
  void MarkDone() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
    GPR_ASSERT(!std::exchange(done_, true));
    ScopedContext contexts(this);
    Destruct(&promise_holder_.promise);
  }

  // In response to Wakeup, run the Promise state machine again until it
  // settles. Then check for completion, and if we have completed, call on_done.
  void Step() Y_ABSL_LOCKS_EXCLUDED(mu()) {
    // Poll the promise until things settle out under a lock.
    mu()->Lock();
    if (done_) {
      // We might get some spurious wakeups after finishing.
      mu()->Unlock();
      return;
    }
    auto status = RunStep();
    mu()->Unlock();
    if (status.has_value()) {
      on_done_(std::move(*status));
    }
  }

  // The main body of a step: set the current activity, and any contexts, and
  // then run the main polling loop. Contained in a function by itself in
  // order to keep the scoping rules a little easier in Step().
  y_absl::optional<ResultType> RunStep() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
    ScopedActivity scoped_activity(this);
    ScopedContext contexts(this);
    return StepLoop();
  }

  // Similarly to RunStep, but additionally construct the promise from a
  // promise factory before entering the main loop. Called once from the
  // constructor.
  y_absl::optional<ResultType> Start(Factory promise_factory)
      Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
    ScopedActivity scoped_activity(this);
    ScopedContext contexts(this);
    Construct(&promise_holder_.promise, promise_factory.Make());
    return StepLoop();
  }

  // Until there are no wakeups from within and the promise is incomplete:
  // poll the promise.
  y_absl::optional<ResultType> StepLoop() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
    GPR_ASSERT(is_current());
    while (true) {
      // Run the promise.
      GPR_ASSERT(!done_);
      auto r = promise_holder_.promise();
      if (auto* status = r.value_if_ready()) {
        // If complete, destroy the promise, flag done, and exit this loop.
        MarkDone();
        return IntoStatus(status);
      }
      // Continue looping til no wakeups occur.
      switch (GotActionDuringRun()) {
        case ActionDuringRun::kNone:
          return {};
        case ActionDuringRun::kWakeup:
          break;
        case ActionDuringRun::kCancel:
          MarkDone();
          return y_absl::CancelledError();
      }
    }
  }

  using Promise = typename Factory::Promise;
  // Callback on completion of the promise.
  GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
  // Has execution completed?
  GPR_NO_UNIQUE_ADDRESS bool done_ Y_ABSL_GUARDED_BY(mu()) = false;
  // Is there a wakeup scheduled?
  GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
  // We wrap the promise in a union to allow control over the construction
  // simultaneously with annotating mutex requirements and noting that the
  // promise contained may not use any memory.
  union PromiseHolder {
    PromiseHolder() {}
    ~PromiseHolder() {}
    GPR_NO_UNIQUE_ADDRESS Promise promise;
  };
  GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ Y_ABSL_GUARDED_BY(mu());
};

}  // namespace promise_detail

// Given a functor that returns a promise (a promise factory), a callback for
// completion, and a callback scheduler, construct an activity.
template <typename Factory, typename WakeupScheduler, typename OnDone,
          typename... Contexts>
ActivityPtr MakeActivity(Factory promise_factory,
                         WakeupScheduler wakeup_scheduler, OnDone on_done,
                         Contexts&&... contexts) {
  return ActivityPtr(
      new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
                                          Contexts...>(
          std::move(promise_factory), std::move(wakeup_scheduler),
          std::move(on_done), std::forward<Contexts>(contexts)...));
}

inline Pending IntraActivityWaiter::pending() {
  wakeups_ |= Activity::current()->CurrentParticipant();
  return Pending();
}

inline void IntraActivityWaiter::Wake() {
  if (wakeups_ == 0) return;
  Activity::current()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
}

}  // namespace grpc_core

#endif  // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H