Add temporary support for Extension promotion concurrency.

PiperOrigin-RevId: 556931717
pull/13675/head^2
Protobuf Team Bot 1 year ago committed by Copybara-Service
parent c552102d66
commit b0f2c34096
  1. 3
      BUILD
  2. 34
      protos/BUILD
  3. 57
      protos/protos.cc
  4. 98
      protos/protos.h
  5. 39
      protos/protos_extension_lock.cc
  6. 54
      protos/protos_extension_lock.h
  7. 111
      protos/protos_extension_lock_test.cc
  8. 3
      protos/protos_internal.h

@ -241,11 +241,14 @@ cc_library(
copts = UPB_DEFAULT_COPTS, copts = UPB_DEFAULT_COPTS,
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":base",
":collections_internal", ":collections_internal",
":eps_copy_input_stream", ":eps_copy_input_stream",
":hash", ":hash",
":mem",
":message_accessors", ":message_accessors",
":message_internal", ":message_internal",
":message_typedef",
":mini_table", ":mini_table",
":port", ":port",
":upb", ":upb",

@ -72,13 +72,16 @@ cc_library(
copts = UPB_DEFAULT_CPPOPTS, copts = UPB_DEFAULT_CPPOPTS,
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":protos_extension_lock",
"//:mem", "//:mem",
"//:message_copy", "//:message_copy",
"//:message_promote", "//:message_promote",
"//:message_typedef",
"//:mini_table", "//:mini_table",
"//:upb", "//:upb",
"//:wire", "//:wire",
"//:wire_internal", "//:wire_internal",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/status", "@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor", "@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings", "@com_google_absl//absl/strings",
@ -103,6 +106,8 @@ cc_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":protos", ":protos",
"//:mem",
"//:message_typedef",
"//:mini_table", "//:mini_table",
"//:upb", "//:upb",
"@com_google_absl//absl/status", "@com_google_absl//absl/status",
@ -111,6 +116,19 @@ cc_library(
], ],
) )
cc_library(
name = "protos_extension_lock",
srcs = ["protos_extension_lock.cc"],
hdrs = ["protos_extension_lock.h"],
copts = UPB_DEFAULT_CPPOPTS,
visibility = ["//visibility:public"],
deps = [
"//:message_typedef",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/synchronization",
],
)
# Common support code for C++ generated code. # Common support code for C++ generated code.
cc_library( cc_library(
name = "generated_protos_support__only_for_generated_code_do_not_use__i_give_permission_to_break_me", name = "generated_protos_support__only_for_generated_code_do_not_use__i_give_permission_to_break_me",
@ -123,6 +141,8 @@ cc_library(
":protos", ":protos",
":protos_internal", ":protos_internal",
":repeated_field", ":repeated_field",
"//:mem",
"//:message_typedef",
], ],
) )
@ -154,3 +174,17 @@ cc_test(
"@com_google_googletest//:gtest_main", "@com_google_googletest//:gtest_main",
], ],
) )
cc_test(
name = "protos_extension_lock_test",
srcs = ["protos_extension_lock_test.cc"],
deps = [
"//:upb",
"//protos",
"//protos:protos_extension_lock",
"//protos_generator/tests:test_model_upb_cc_proto",
"@com_google_absl//absl/hash",
"@com_google_absl//absl/log:absl_check",
"@com_google_googletest//:gtest_main",
],
)

@ -30,9 +30,25 @@
#include "protos/protos.h" #include "protos/protos.h"
#include <atomic>
#include <cstddef>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "protos/protos_extension_lock.h"
#include "upb/mem/arena.h"
#include "upb/message/copy.h"
#include "upb/message/extension_internal.h"
#include "upb/message/promote.h" #include "upb/message/promote.h"
#include "upb/message/typedef.h"
#include "upb/mini_table/extension.h"
#include "upb/mini_table/extension_registry.h"
#include "upb/mini_table/message.h"
#include "upb/wire/common.h" #include "upb/wire/common.h"
#include "upb/wire/decode.h"
#include "upb/wire/encode.h"
namespace protos { namespace protos {
@ -95,8 +111,33 @@ upb_ExtensionRegistry* GetUpbExtensions(
return extension_registry.registry_; return extension_registry.registry_;
} }
/**
* MessageLock(msg) acquires lock on msg when constructed and releases it when
* destroyed.
*/
class MessageLock {
public:
explicit MessageLock(const upb_Message* msg) : msg_(msg) {
UpbExtensionLocker locker =
upb_extension_locker_global.load(std::memory_order_acquire);
unlocker_ = (locker != nullptr) ? locker(msg) : nullptr;
}
MessageLock(const MessageLock&) = delete;
void operator=(const MessageLock&) = delete;
~MessageLock() {
if (unlocker_ != nullptr) {
unlocker_(msg_);
}
}
private:
const upb_Message* msg_;
UpbExtensionUnlocker unlocker_;
};
bool HasExtensionOrUnknown(const upb_Message* msg, bool HasExtensionOrUnknown(const upb_Message* msg,
const upb_MiniTableExtension* eid) { const upb_MiniTableExtension* eid) {
MessageLock msg_lock(msg);
return _upb_Message_Getext(msg, eid) != nullptr || return _upb_Message_Getext(msg, eid) != nullptr ||
upb_MiniTable_FindUnknown(msg, eid->field.number, upb_MiniTable_FindUnknown(msg, eid->field.number,
kUpb_WireFormat_DefaultDepthLimit) kUpb_WireFormat_DefaultDepthLimit)
@ -105,12 +146,13 @@ bool HasExtensionOrUnknown(const upb_Message* msg,
const upb_Message_Extension* GetOrPromoteExtension( const upb_Message_Extension* GetOrPromoteExtension(
upb_Message* msg, const upb_MiniTableExtension* eid, upb_Arena* arena) { upb_Message* msg, const upb_MiniTableExtension* eid, upb_Arena* arena) {
MessageLock msg_lock(msg);
const upb_Message_Extension* ext = _upb_Message_Getext(msg, eid); const upb_Message_Extension* ext = _upb_Message_Getext(msg, eid);
if (ext == nullptr) { if (ext == nullptr) {
upb_GetExtension_Status ext_status = upb_MiniTable_GetOrPromoteExtension( upb_GetExtension_Status ext_status = upb_MiniTable_GetOrPromoteExtension(
(upb_Message*)msg, eid, kUpb_WireFormat_DefaultDepthLimit, arena, &ext); (upb_Message*)msg, eid, kUpb_WireFormat_DefaultDepthLimit, arena, &ext);
if (ext_status != kUpb_GetExtension_Ok) { if (ext_status != kUpb_GetExtension_Ok) {
return nullptr; ext = nullptr;
} }
} }
return ext; return ext;
@ -119,6 +161,7 @@ const upb_Message_Extension* GetOrPromoteExtension(
absl::StatusOr<absl::string_view> Serialize(const upb_Message* message, absl::StatusOr<absl::string_view> Serialize(const upb_Message* message,
const upb_MiniTable* mini_table, const upb_MiniTable* mini_table,
upb_Arena* arena, int options) { upb_Arena* arena, int options) {
MessageLock msg_lock(message);
size_t len; size_t len;
char* ptr; char* ptr;
upb_EncodeStatus status = upb_EncodeStatus status =
@ -129,6 +172,18 @@ absl::StatusOr<absl::string_view> Serialize(const upb_Message* message,
return MessageEncodeError(status); return MessageEncodeError(status);
} }
void DeepCopy(upb_Message* target, const upb_Message* source,
const upb_MiniTable* mini_table, upb_Arena* arena) {
MessageLock msg_lock(source);
upb_Message_DeepCopy(target, source, mini_table, arena);
}
upb_Message* DeepClone(const upb_Message* source,
const upb_MiniTable* mini_table, upb_Arena* arena) {
MessageLock msg_lock(source);
return upb_Message_DeepClone(source, mini_table, arena);
}
} // namespace internal } // namespace internal
} // namespace protos } // namespace protos

@ -135,52 +135,6 @@ typename T::Proxy CreateMessage(::protos::Arena& arena) {
arena.ptr()); arena.ptr());
} }
template <typename T>
typename T::Proxy CloneMessage(Ptr<T> message, upb::Arena& arena) {
return
typename T::Proxy(upb_Message_DeepClone(internal::GetInternalMsg(message),
T::minitable(), arena.ptr()),
arena.ptr());
}
template <typename T>
void DeepCopy(Ptr<const T> source_message, Ptr<T> target_message) {
static_assert(!std::is_const_v<T>);
upb_Message_DeepCopy(
internal::GetInternalMsg(target_message),
internal::GetInternalMsg(source_message), T::minitable(),
static_cast<upb_Arena*>(target_message->GetInternalArena()));
}
template <typename T>
void DeepCopy(Ptr<const T> source_message, T* target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(source_message, protos::Ptr(target_message));
}
template <typename T>
void DeepCopy(const T* source_message, Ptr<T> target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(protos::Ptr(source_message), target_message);
}
template <typename T>
void DeepCopy(const T* source_message, T* target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(protos::Ptr(source_message), protos::Ptr(target_message));
}
template <typename T>
void ClearMessage(Ptr<T> message) {
static_assert(!std::is_const_v<T>, "");
upb_Message_Clear(internal::GetInternalMsg(message), T::minitable());
}
template <typename T>
void ClearMessage(T* message) {
ClearMessage(protos::Ptr(message));
}
// begin:github_only // begin:github_only
// This type exists to work around an absl type that has not yet been // This type exists to work around an absl type that has not yet been
// released. // released.
@ -288,8 +242,60 @@ bool HasExtensionOrUnknown(const upb_Message* msg,
const upb_Message_Extension* GetOrPromoteExtension( const upb_Message_Extension* GetOrPromoteExtension(
upb_Message* msg, const upb_MiniTableExtension* eid, upb_Arena* arena); upb_Message* msg, const upb_MiniTableExtension* eid, upb_Arena* arena);
void DeepCopy(upb_Message* target, const upb_Message* source,
const upb_MiniTable* mini_table, upb_Arena* arena);
upb_Message* DeepClone(const upb_Message* source,
const upb_MiniTable* mini_table, upb_Arena* arena);
} // namespace internal } // namespace internal
template <typename T>
void DeepCopy(Ptr<const T> source_message, Ptr<T> target_message) {
static_assert(!std::is_const_v<T>);
::protos::internal::DeepCopy(
internal::GetInternalMsg(target_message),
internal::GetInternalMsg(source_message), T::minitable(),
static_cast<upb_Arena*>(target_message->GetInternalArena()));
}
template <typename T>
typename T::Proxy CloneMessage(Ptr<T> message, upb::Arena& arena) {
return typename T::Proxy(
::protos::internal::DeepClone(internal::GetInternalMsg(message),
T::minitable(), arena.ptr()),
arena.ptr());
}
template <typename T>
void DeepCopy(Ptr<const T> source_message, T* target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(source_message, protos::Ptr(target_message));
}
template <typename T>
void DeepCopy(const T* source_message, Ptr<T> target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(protos::Ptr(source_message), target_message);
}
template <typename T>
void DeepCopy(const T* source_message, T* target_message) {
static_assert(!std::is_const_v<T>);
DeepCopy(protos::Ptr(source_message), protos::Ptr(target_message));
}
template <typename T>
void ClearMessage(Ptr<T> message) {
static_assert(!std::is_const_v<T>, "");
upb_Message_Clear(internal::GetInternalMsg(message), T::minitable());
}
template <typename T>
void ClearMessage(T* message) {
ClearMessage(protos::Ptr(message));
}
class ExtensionRegistry { class ExtensionRegistry {
public: public:
ExtensionRegistry( ExtensionRegistry(

@ -0,0 +1,39 @@
// Protocol Buffers - Google's data interchange format
// Copyright 2023 Google LLC. All rights reserved.
// https://developers.google.com/protocol-buffers/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google LLC nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "protos/protos_extension_lock.h"
#include <atomic>
namespace protos::internal {
std::atomic<UpbExtensionLocker> upb_extension_locker_global;
} // namespace protos::internal

@ -0,0 +1,54 @@
// Protocol Buffers - Google's data interchange format
// Copyright 2023 Google LLC. All rights reserved.
// https://developers.google.com/protocol-buffers/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google LLC nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef UPB_PROTOS_PROTOS_EXTENSION_LOCK_H_
#define UPB_PROTOS_PROTOS_EXTENSION_LOCK_H_
#include <atomic>
namespace protos::internal {
// TODO(b/295355754): Temporary locking api for cross-language
// concurrency issue around extension api that uses lazy promotion
// from unknown data to upb_MiniTableExtension. Will be replaced by
// a core runtime solution in the future.
//
// Any api(s) using unknown or extension data (GetOrPromoteExtension,
// Serialize and others) call lock/unlock to provide a way for
// mixed language implementations to avoid race conditions)
using UpbExtensionUnlocker = void (*)(const void*);
using UpbExtensionLocker = UpbExtensionUnlocker (*)(const void*);
// TODO(b/295355754): Expose as function instead of global.
extern std::atomic<UpbExtensionLocker> upb_extension_locker_global;
} // namespace protos::internal
#endif // UPB_PROTOS_PROTOS_EXTENSION_LOCK_H_

@ -0,0 +1,111 @@
#include "protos/protos_extension_lock.h"
#include <atomic>
#include <mutex>
#include <string>
#include <thread>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "absl/hash/hash.h"
#include "absl/log/absl_check.h"
#include "protos/protos.h"
#include "protos_generator/tests/test_model.upb.proto.h"
#include "upb/upb.hpp"
#ifndef ASSERT_OK
#define ASSERT_OK(x) ASSERT_TRUE(x.ok())
#endif // ASSERT_OK
#ifndef EXPECT_OK
#define EXPECT_OK(x) EXPECT_TRUE(x.ok())
#endif // EXPECT_OK
namespace protos_generator::test::protos {
namespace {
std::string GenerateTestData() {
TestModel model;
model.set_str1("str");
ThemeExtension extension1;
extension1.set_ext_name("theme");
ABSL_CHECK_OK(::protos::SetExtension(&model, theme, extension1));
ThemeExtension extension2;
extension2.set_ext_name("theme_extension");
ABSL_CHECK_OK(::protos::SetExtension(&model, ThemeExtension::theme_extension,
extension2));
::upb::Arena arena;
auto bytes = ::protos::Serialize(&model, arena);
ABSL_CHECK_OK(bytes);
return std::string(bytes->data(), bytes->size());
}
std::mutex m[8];
void unlock_func(const void* msg) { m[absl::HashOf(msg) & 0x7].unlock(); }
::protos::internal::UpbExtensionUnlocker lock_func(const void* msg) {
m[absl::HashOf(msg) & 0x7].lock();
return &unlock_func;
}
void TestConcurrentExtensionAccess(::protos::ExtensionRegistry registry) {
::protos::internal::upb_extension_locker_global.store(
&lock_func, std::memory_order_release);
const std::string payload = GenerateTestData();
TestModel parsed_model =
::protos::Parse<TestModel>(payload, registry).value();
const auto test_main = [&] { EXPECT_EQ("str", parsed_model.str1()); };
const auto test_theme = [&] {
ASSERT_TRUE(::protos::HasExtension(&parsed_model, theme));
auto ext = ::protos::GetExtension(&parsed_model, theme);
ASSERT_OK(ext);
EXPECT_EQ((*ext)->ext_name(), "theme");
};
const auto test_theme_extension = [&] {
auto ext =
::protos::GetExtension(&parsed_model, ThemeExtension::theme_extension);
ASSERT_OK(ext);
EXPECT_EQ((*ext)->ext_name(), "theme_extension");
};
const auto test_serialize = [&] {
::upb::Arena arena;
EXPECT_OK(::protos::Serialize(&parsed_model, arena));
};
std::thread t1(test_main);
std::thread t2(test_main);
std::thread t3(test_theme);
std::thread t4(test_theme);
std::thread t5(test_theme_extension);
std::thread t6(test_theme_extension);
std::thread t7(test_serialize);
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
test_main();
test_theme();
test_theme_extension();
}
TEST(CppGeneratedCode, ConcurrentAccessDoesNotRaceBothLazy) {
::upb::Arena arena;
TestConcurrentExtensionAccess({{}, arena});
}
TEST(CppGeneratedCode, ConcurrentAccessDoesNotRaceOneLazyOneEager) {
::upb::Arena arena;
TestConcurrentExtensionAccess({{&theme}, arena});
TestConcurrentExtensionAccess({{&ThemeExtension::theme_extension}, arena});
}
TEST(CppGeneratedCode, ConcurrentAccessDoesNotRaceBothEager) {
::upb::Arena arena;
TestConcurrentExtensionAccess(
{{&theme, &ThemeExtension::theme_extension}, arena});
}
} // namespace
} // namespace protos_generator::test::protos

@ -31,7 +31,8 @@
#ifndef UPB_PROTOS_PROTOS_INTERNAL_H_ #ifndef UPB_PROTOS_PROTOS_INTERNAL_H_
#define UPB_PROTOS_PROTOS_INTERNAL_H_ #define UPB_PROTOS_PROTOS_INTERNAL_H_
#include "protos/protos.h" #include "upb/mem/arena.h"
#include "upb/message/typedef.h"
namespace protos::internal { namespace protos::internal {

Loading…
Cancel
Save