mirror of https://github.com/grpc/grpc.git
[promises] Add a bridge between callbacks and promises (#33792)
Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/34082/head
parent
dd8c505d92
commit
361769c905
7 changed files with 243 additions and 0 deletions
@ -0,0 +1,69 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_WAIT_FOR_CALLBACK_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_WAIT_FOR_CALLBACK_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Bridge callback interfaces and promise interfaces.
|
||||
// This class helps bridge older callback interfaces with promises:
|
||||
// MakeWaitPromise() returns a promise that will wait until a callback created
|
||||
// by MakeCallback() has been invoked.
|
||||
class WaitForCallback { |
||||
public: |
||||
// Creates a promise that blocks until the callback is invoked.
|
||||
auto MakeWaitPromise() { |
||||
return [state = state_]() -> Poll<Empty> { |
||||
MutexLock lock(&state->mutex); |
||||
if (state->done) return Empty{}; |
||||
state->waker = Activity::current()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
}; |
||||
} |
||||
|
||||
// Creates a callback that unblocks the promise.
|
||||
auto MakeCallback() { |
||||
return [state = state_]() { |
||||
ReleasableMutexLock lock(&state->mutex); |
||||
state->done = true; |
||||
auto waker = std::move(state->waker); |
||||
lock.Release(); |
||||
waker.Wakeup(); |
||||
}; |
||||
} |
||||
|
||||
private: |
||||
struct State { |
||||
Mutex mutex; |
||||
bool done ABSL_GUARDED_BY(mutex) = false; |
||||
Waker waker ABSL_GUARDED_BY(mutex); |
||||
}; |
||||
const std::shared_ptr<State> state_{std::make_shared<State>()}; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_WAIT_FOR_CALLBACK_H
|
@ -0,0 +1,50 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/wait_for_callback.h" |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "test/core/promise/test_wakeup_schedulers.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TEST(WaitForCallbackTest, Works) { |
||||
WaitForCallback w4cb; |
||||
auto callback = w4cb.MakeCallback(); |
||||
Notification done; |
||||
auto activity = MakeActivity( |
||||
[&w4cb]() { |
||||
return Map(w4cb.MakeWaitPromise(), |
||||
[](Empty) { return absl::OkStatus(); }); |
||||
}, |
||||
InlineWakeupScheduler{}, |
||||
[&done](const absl::Status& s) { |
||||
EXPECT_TRUE(s.ok()); |
||||
done.Notify(); |
||||
}); |
||||
EXPECT_FALSE(done.HasBeenNotified()); |
||||
callback(); |
||||
done.WaitForNotification(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue