diff --git a/upb/mem/arena.c b/upb/mem/arena.c index 43b254df0f..311c712634 100644 --- a/upb/mem/arena.c +++ b/upb/mem/arena.c @@ -31,11 +31,6 @@ // Must be last. #include "upb/port/def.inc" -static uintptr_t upb_cleanup_metadata(uint32_t* cleanup, - bool has_initial_block) { - return (uintptr_t)cleanup | has_initial_block; -} - struct _upb_MemBlock { // Atomic only for the benefit of SpaceAllocated(). UPB_ATOMIC(_upb_MemBlock*) next; @@ -46,10 +41,16 @@ struct _upb_MemBlock { static const size_t memblock_reserve = UPB_ALIGN_UP(sizeof(_upb_MemBlock), UPB_MALLOC_ALIGN); -static upb_Arena* _upb_Arena_FindRoot(upb_Arena* a) { +typedef struct _upb_ArenaRoot { + upb_Arena* root; + uintptr_t tagged_count; +} _upb_ArenaRoot; + +static _upb_ArenaRoot _upb_Arena_FindRoot(upb_Arena* a) { uintptr_t poc = upb_Atomic_Load(&a->parent_or_count, memory_order_acquire); while (_upb_Arena_IsTaggedPointer(poc)) { upb_Arena* next = _upb_Arena_PointerFromTagged(poc); + UPB_ASSERT(a != next); uintptr_t next_poc = upb_Atomic_Load(&next->parent_or_count, memory_order_acquire); @@ -73,20 +74,22 @@ static upb_Arena* _upb_Arena_FindRoot(upb_Arena* a) { // further away over time, but the path towards that root will continue to // be valid and the creation of the path carries all the memory orderings // required. + UPB_ASSERT(a != _upb_Arena_PointerFromTagged(next_poc)); upb_Atomic_Store(&a->parent_or_count, next_poc, memory_order_relaxed); } a = next; poc = next_poc; } - return a; + return (_upb_ArenaRoot){.root = a, .tagged_count = poc}; } size_t upb_Arena_SpaceAllocated(upb_Arena* arena) { - arena = _upb_Arena_FindRoot(arena); + arena = _upb_Arena_FindRoot(arena).root; size_t memsize = 0; while (arena != NULL) { - _upb_MemBlock* block = arena->blocks; + _upb_MemBlock* block = + upb_Atomic_Load(&arena->blocks, memory_order_relaxed); while (block != NULL) { memsize += sizeof(_upb_MemBlock) + block->size; block = upb_Atomic_Load(&block->next, memory_order_relaxed); @@ -112,9 +115,9 @@ static void upb_Arena_AddBlock(upb_Arena* a, void* ptr, size_t size) { _upb_MemBlock* block = ptr; // Insert into linked list. - upb_Atomic_Init(&block->next, a->blocks); block->size = (uint32_t)size; - upb_Atomic_Store(&a->blocks, block, memory_order_relaxed); + upb_Atomic_Init(&block->next, a->blocks); + upb_Atomic_Store(&a->blocks, block, memory_order_release); a->head.ptr = UPB_PTR_AT(block, memblock_reserve, char); a->head.end = UPB_PTR_AT(block, size, char); @@ -124,7 +127,7 @@ static void upb_Arena_AddBlock(upb_Arena* a, void* ptr, size_t size) { static bool upb_Arena_AllocBlock(upb_Arena* a, size_t size) { if (!a->block_alloc) return false; - _upb_MemBlock* last_block = upb_Atomic_Load(&a->blocks, memory_order_relaxed); + _upb_MemBlock* last_block = upb_Atomic_Load(&a->blocks, memory_order_acquire); size_t last_size = last_block != NULL ? last_block->size : 128; size_t block_size = UPB_MAX(size, last_size * 2) + memblock_reserve; _upb_MemBlock* block = upb_malloc(upb_Arena_BlockAlloc(a), block_size); @@ -207,11 +210,11 @@ static void arena_dofree(upb_Arena* a) { upb_Arena* next_arena = (upb_Arena*)upb_Atomic_Load(&a->next, memory_order_acquire); upb_alloc* block_alloc = upb_Arena_BlockAlloc(a); - _upb_MemBlock* block = upb_Atomic_Load(&a->blocks, memory_order_relaxed); + _upb_MemBlock* block = upb_Atomic_Load(&a->blocks, memory_order_acquire); while (block != NULL) { // Load first since we are deleting block. _upb_MemBlock* next_block = - upb_Atomic_Load(&block->next, memory_order_relaxed); + upb_Atomic_Load(&block->next, memory_order_acquire); upb_free(block_alloc, block); block = next_block; } @@ -235,7 +238,7 @@ retry: return; } - if (upb_Atomic_CompareExchangeStrong( + if (upb_Atomic_CompareExchangeWeak( &a->parent_or_count, &poc, _upb_Arena_TaggedFromRefcount(_upb_Arena_RefCountFromTagged(poc) - 1), memory_order_release, memory_order_acquire)) { @@ -248,10 +251,9 @@ retry: goto retry; } -bool upb_Arena_Fuse(upb_Arena* a1, upb_Arena* a2) { - // SAFE IN THE PRESENCE OF FUSE/FREE RACES BUT NOT IN THE - // PRESENCE OF FUSE/FUSE RACES!!! - // +#define kUpb_RefDelta_CannotFuse -1 + +upb_Arena* upb_Arena_DoFuse(upb_Arena* a1, upb_Arena* a2, intptr_t* ref_delta) { // `parent_or_count` has two disctint modes // - parent pointer mode // - refcount mode @@ -259,86 +261,104 @@ bool upb_Arena_Fuse(upb_Arena* a1, upb_Arena* a2) { // In parent pointer mode, it may change what pointer it refers to in the // tree, but it will always approach a root. Any operation that walks the // tree to the root may collapse levels of the tree concurrently. - // - // In refcount mode, any free operation may lower the refcount. - // - // Only a fuse operation may increase the refcount. - // Only a fuse operation may switch `parent_or_count` from parent mode to - // refcount mode. - // - // Given that we do not allow fuse/fuse races, we may rely on the invariant - // that only refcounts can change once we have found the root. Because the - // threads doing the fuse must hold references, we can guarantee that no - // refcounts will reach zero concurrently. + _upb_ArenaRoot r1 = _upb_Arena_FindRoot(a1); + _upb_ArenaRoot r2 = _upb_Arena_FindRoot(a2); - upb_Arena* r1 = _upb_Arena_FindRoot(a1); - upb_Arena* r2 = _upb_Arena_FindRoot(a2); - - if (r1 == r2) return true; // Already fused. + if (r1.root == r2.root) return r1.root; // Already fused. // Do not fuse initial blocks since we cannot lifetime extend them. // Any other fuse scenario is allowed. - if (upb_Arena_HasInitialBlock(r1)) return false; - if (upb_Arena_HasInitialBlock(r2)) return false; - - uintptr_t r1_poc = - upb_Atomic_Load(&r1->parent_or_count, memory_order_acquire); - uintptr_t r2_poc = - upb_Atomic_Load(&r2->parent_or_count, memory_order_acquire); - UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r1_poc)); - UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r2_poc)); - - // Keep the tree shallow by joining the smaller tree to the larger. - if (_upb_Arena_RefCountFromTagged(r1_poc) < - _upb_Arena_RefCountFromTagged(r2_poc)) { - upb_Arena* tmp = r1; + if (upb_Arena_HasInitialBlock(r1.root) || + upb_Arena_HasInitialBlock(r2.root)) { + *ref_delta = kUpb_RefDelta_CannotFuse; + return NULL; + } + + // Avoid cycles by always fusing into the root with the lower address. + if ((uintptr_t)r1.root > (uintptr_t)r2.root) { + _upb_ArenaRoot tmp = r1; r1 = r2; r2 = tmp; - - uintptr_t tmp_poc = r1_poc; - r1_poc = r2_poc; - r2_poc = tmp_poc; } // The moment we install `r1` as the parent for `r2` all racing frees may - // immediately begin decrementing `r1`'s refcount. So we must install all the - // refcounts that we know about first to prevent a premature unref to zero. - uint32_t r2_refcount = _upb_Arena_RefCountFromTagged(r2_poc); - upb_Atomic_Add(&r1->parent_or_count, ((uintptr_t)r2_refcount) << 1, - memory_order_release); - - // When installing `r1` as the parent for `r2` racing frees may have changed - // the refcount for `r2` so we need to capture the old value to fix up `r1`'s - // refcount based on the delta from what we saw the first time. - r2_poc = upb_Atomic_Exchange(&r2->parent_or_count, - _upb_Arena_TaggedFromPointer(r1), - memory_order_acq_rel); - UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r2_poc)); - uint32_t delta_refcount = r2_refcount - _upb_Arena_RefCountFromTagged(r2_poc); - if (delta_refcount != 0) { - upb_Atomic_Sub(&r1->parent_or_count, ((uintptr_t)delta_refcount) << 1, - memory_order_release); + // immediately begin decrementing `r1`'s refcount (including pending + // increments to that refcount and their frees!). We need to add `r2`'s refs + // now, so that `r1` can withstand any unrefs that come from r2. + // + // Note that while it is possible for `r2`'s refcount to increase + // asynchronously, we will not actually do the reparenting operation below + // unless `r2`'s refcount is unchanged from when we read it. + // + // Note that we may have done this previously, either to this node or a + // different node, during a previous and failed DoFuse() attempt. But we will + // not lose track of these refs because we always add them to our overall + // delta. + uintptr_t r2_untagged_count = r2.tagged_count & ~1; + uintptr_t with_r2_refs = r1.tagged_count + r2_untagged_count; + if (!upb_Atomic_CompareExchangeStrong( + &r1.root->parent_or_count, &r1.tagged_count, with_r2_refs, + memory_order_release, memory_order_acquire)) { + return NULL; } - // Now append r2's linked list of arenas to r1's. - upb_Arena* r2_tail = upb_Atomic_Load(&r2->tail, memory_order_relaxed); - upb_Arena* r1_tail = upb_Atomic_Load(&r1->tail, memory_order_relaxed); - upb_Arena* r1_next = upb_Atomic_Load(&r1_tail->next, memory_order_relaxed); - while (r1_next != NULL) { - // r1->tail was stale. This can happen, but tail should always converge on - // the true tail. - r1_tail = r1_next; - r1_next = upb_Atomic_Load(&r1_tail->next, memory_order_relaxed); + // Perform the actual fuse by removing the refs from `r2` and swapping in the + // parent pointer. + if (upb_Atomic_CompareExchangeWeak( + &r2.root->parent_or_count, &r2.tagged_count, + _upb_Arena_TaggedFromPointer(r1.root), memory_order_release, + memory_order_acquire)) { + } else { + // We'll need to remove the excess refs we added to r1 previously. + *ref_delta -= r2_untagged_count; + return NULL; } - upb_Arena* old_next = - upb_Atomic_Exchange(&r1_tail->next, r2, memory_order_relaxed); + // Now that the fuse has been performed (and can no longer fail) we need to + // append `r2` to `r1`'s linked list. Find the region for `r2`'s linked list. + upb_Arena* r1_tail = upb_Atomic_Load(&r1.root->tail, memory_order_relaxed); + while (true) { + upb_Arena* r1_next = upb_Atomic_Load(&r1_tail->next, memory_order_relaxed); + while (r1_next != NULL) { + // r1->tail was stale. This can happen, but tail should always converge + // on the true tail. + r1_tail = r1_next; + r1_next = upb_Atomic_Load(&r1_tail->next, memory_order_relaxed); + } + if (upb_Atomic_CompareExchangeStrong(&r1_tail->next, &r1_next, r2.root, + memory_order_relaxed, + memory_order_relaxed)) { + break; + } + } - // Once fuse/fuse races are allowed, it will need to be a CAS instead that - // handles this mismatch gracefully. - UPB_ASSERT(old_next == NULL); + upb_Arena* r2_tail = upb_Atomic_Load(&r2.root->tail, memory_order_relaxed); + upb_Atomic_Store(&r1.root->tail, r2_tail, memory_order_relaxed); + return r1.root; +} - upb_Atomic_Store(&r1->tail, r2_tail, memory_order_relaxed); +bool upb_Arena_FixupRefs(upb_Arena* new_root, intptr_t ref_delta) { + if (ref_delta == 0) return true; // No fixup required. + uintptr_t poc = + upb_Atomic_Load(&new_root->parent_or_count, memory_order_relaxed); + if (_upb_Arena_IsTaggedPointer(poc)) return false; + uintptr_t with_refs = poc + ref_delta; + UPB_ASSERT(!_upb_Arena_IsTaggedPointer(with_refs)); + return upb_Atomic_CompareExchangeStrong(&new_root->parent_or_count, &poc, + with_refs, memory_order_relaxed, + memory_order_relaxed); +} - return true; +bool upb_Arena_Fuse(upb_Arena* a1, upb_Arena* a2) { + // The number of refs we ultimately need to transfer to the new root. + intptr_t ref_delta = 0; + + while (true) { + upb_Arena* new_root = upb_Arena_DoFuse(a1, a2, &ref_delta); + if (new_root != NULL) { + if (upb_Arena_FixupRefs(new_root, ref_delta)) return true; + } else { + if (ref_delta == kUpb_RefDelta_CannotFuse) return false; + } + } } diff --git a/upb/mem/arena_internal.h b/upb/mem/arena_internal.h index e5a81f0d1a..cecc4d7a9d 100644 --- a/upb/mem/arena_internal.h +++ b/upb/mem/arena_internal.h @@ -72,13 +72,13 @@ UPB_INLINE bool _upb_Arena_IsTaggedPointer(uintptr_t parent_or_count) { return (parent_or_count & 1) == 0; } -UPB_INLINE uint32_t _upb_Arena_RefCountFromTagged(uintptr_t parent_or_count) { +UPB_INLINE uintptr_t _upb_Arena_RefCountFromTagged(uintptr_t parent_or_count) { UPB_ASSERT(_upb_Arena_IsTaggedRefcount(parent_or_count)); return parent_or_count >> 1; } -UPB_INLINE uintptr_t _upb_Arena_TaggedFromRefcount(uint32_t refcount) { - uintptr_t parent_or_count = (((uintptr_t)refcount) << 1) | 1; +UPB_INLINE uintptr_t _upb_Arena_TaggedFromRefcount(uintptr_t refcount) { + uintptr_t parent_or_count = (refcount << 1) | 1; UPB_ASSERT(_upb_Arena_IsTaggedRefcount(parent_or_count)); return parent_or_count; } diff --git a/upb/mem/arena_test.cc b/upb/mem/arena_test.cc index 87fadfd60a..7d5453cf89 100644 --- a/upb/mem/arena_test.cc +++ b/upb/mem/arena_test.cc @@ -139,8 +139,7 @@ TEST(ArenaTest, FuzzFuseFreeRace) { for (auto& t : threads) t.join(); } -// Disabled because this operation is currently unsupported. -TEST(ArenaTest, DISABLED_FuzzFuseFuseRace) { +TEST(ArenaTest, FuzzFuseFuseRace) { Environment env; absl::Notification done; @@ -149,7 +148,7 @@ TEST(ArenaTest, DISABLED_FuzzFuseFuseRace) { threads.emplace_back([&]() { absl::BitGen gen; while (!done.HasBeenNotified()) { - env.RandomPoke(gen); + env.RandomFuse(gen); } }); } @@ -157,7 +156,7 @@ TEST(ArenaTest, DISABLED_FuzzFuseFuseRace) { absl::BitGen gen; auto end = absl::Now() + absl::Seconds(2); while (absl::Now() < end) { - env.RandomPoke(gen); + env.RandomFuse(gen); } done.Notify(); for (auto& t : threads) t.join(); diff --git a/upb/port/atomic.h b/upb/port/atomic.h index 450297ae50..506acdedd7 100644 --- a/upb/port/atomic.h +++ b/upb/port/atomic.h @@ -49,6 +49,10 @@ success_order, failure_order) \ atomic_compare_exchange_strong_explicit(addr, expected, desired, \ success_order, failure_order) +#define upb_Atomic_CompareExchangeWeak(addr, expected, desired, success_order, \ + failure_order) \ + atomic_compare_exchange_weak_explicit(addr, expected, desired, \ + success_order, failure_order) #else // !UPB_USE_C11_ATOMICS @@ -108,6 +112,12 @@ UPB_INLINE bool _upb_NonAtomic_CompareExchangeStrongP(upb_Arena** addr, uintptr_t: _upb_NonAtomic_CompareExchangeStrongU, \ upb_Arena *: _upb_NonAtomic_CompareExchangeStrongP)(addr, expected, \ desired) +#define upb_Atomic_CompareExchangeWeak(addr, expected, desired, success_order, \ + failure_order) \ + _Generic((desired), \ + uintptr_t: _upb_NonAtomic_CompareExchangeStrongU, \ + upb_Arena *: _upb_NonAtomic_CompareExchangeStrongP)(addr, expected, \ + desired) #endif