@@ -209,6 +209,12 @@ struct ConcurrentQueueDefaultTraits
209209 // internal queue.
210210 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256 ;
211211
212+ // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
213+ // Enqueue operations that would cause this limit to be surpassed will fail. Note
214+ // that this limit is enforced at the block level (for performance reasons), i.e.
215+ // it's rounded up to the nearest block size.
216+ static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t >::value;
217+
212218
213219 // Memory allocation can be customized if needed.
214220 // malloc should return nullptr on failure, and handle alignment like std::malloc.
@@ -291,6 +297,63 @@ namespace details
291297 ++x;
292298 return x;
293299 }
300+
301+ template <typename T>
302+ static inline void swap_relaxed (std::atomic<T>& left, std::atomic<T>& right)
303+ {
304+ T temp = std::move (left.load (std::memory_order_relaxed));
305+ left.store (std::move (right.load (std::memory_order_relaxed)), std::memory_order_relaxed);
306+ right.store (std::move (temp), std::memory_order_relaxed);
307+ }
308+
309+ template <typename T>
310+ static inline T const & nomove (T const & x)
311+ {
312+ return x;
313+ }
314+
315+ template <bool Enable>
316+ struct nomove_if
317+ {
318+ template <typename T>
319+ static inline T const & eval (T const & x)
320+ {
321+ return x;
322+ }
323+ };
324+
325+ template <>
326+ struct nomove_if <false >
327+ {
328+ template <typename U>
329+ static inline auto eval (U&& x)
330+ -> decltype(std::forward<U>(x))
331+ {
332+ return std::forward<U>(x);
333+ }
334+ };
335+
336+ template <typename It>
337+ static inline auto deref_noexcept (It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
338+ {
339+ return *it;
340+ }
341+
342+ #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
343+ template <typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
344+ #else
345+ template <typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
346+ #endif
347+
348+ template <typename T> struct static_is_lock_free_num { enum { value = 0 }; };
349+ template <> struct static_is_lock_free_num <signed char > { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
350+ template <> struct static_is_lock_free_num <short > { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
351+ template <> struct static_is_lock_free_num <int > { enum { value = ATOMIC_INT_LOCK_FREE }; };
352+ template <> struct static_is_lock_free_num <long > { enum { value = ATOMIC_LONG_LOCK_FREE }; };
353+ template <> struct static_is_lock_free_num <long long > { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
354+ template <typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
355+ template <> struct static_is_lock_free <bool > { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
356+ template <typename U> struct static_is_lock_free <U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
294357}
295358
296359
@@ -325,6 +388,16 @@ struct ProducerToken
325388 }
326389 }
327390
391+ // A token is always valid unless:
392+ // 1) Memory allocation failed during construction
393+ // 2) It was moved via the move constructor
394+ // (Note: assignment does a swap, leaving both potentially valid)
395+ // 3) The associated queue was destroyed
396+ // Note that if valid() returns true, that only indicates
397+ // that the token is valid for use with a specific queue,
398+ // but not which one; that's up to the user to track.
399+ inline bool valid () const { return producer != nullptr ; }
400+
328401 ~ProducerToken ()
329402 {
330403 if (producer != nullptr ) {
@@ -350,8 +423,25 @@ struct ConsumerToken
350423 template <typename T, typename Traits>
351424 explicit ConsumerToken (ConcurrentQueue<T, Traits>& q);
352425
353- ConsumerToken (ConsumerToken&& other) MOODYCAMEL_DELETE_FUNCTION;
354- ConsumerToken& operator =(ConsumerToken&& other) MOODYCAMEL_DELETE_FUNCTION;
426+ ConsumerToken (ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
427+ : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
428+ {
429+ }
430+
431+ inline ConsumerToken& operator =(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
432+ {
433+ swap (other);
434+ return *this ;
435+ }
436+
437+ void swap (ConsumerToken& other) MOODYCAMEL_NOEXCEPT
438+ {
439+ std::swap (initialOffset, other.initialOffset );
440+ std::swap (lastKnownGlobalOffset, other.lastKnownGlobalOffset );
441+ std::swap (itemsConsumedFromCurrent, other.itemsConsumedFromCurrent );
442+ std::swap (currentProducer, other.currentProducer );
443+ std::swap (desiredProducer, other.desiredProducer );
444+ }
355445
356446 // Disable copying and assignment
357447 ConsumerToken (ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION;
@@ -385,6 +475,15 @@ class ConcurrentQueue
385475 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast <size_t >(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
386476 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast <size_t >(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
387477 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast <std::uint32_t >(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
478+ #ifdef _MSC_VER
479+ #pragma warning(push)
480+ #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
481+ #pragma warning(disable: 4309) // static_cast: Truncation of constant value
482+ #endif
483+ static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t >::value - static_cast <size_t >(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t >::value : ((static_cast <size_t >(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1 )) / BLOCK_SIZE * BLOCK_SIZE);
484+ #ifdef _MSC_VER
485+ #pragma warning(pop)
486+ #endif
388487
389488 static_assert (!std::numeric_limits<size_t >::is_signed && std::is_integral<size_t >::value, " Traits::size_t must be an unsigned integral type" );
390489 static_assert (!std::numeric_limits<index_t >::is_signed && std::is_integral<index_t >::value, " Traits::index_t must be an unsigned integral type" );
@@ -583,6 +682,20 @@ class ConcurrentQueue
583682 }
584683 return size;
585684 }
685+
686+
687+ // Returns true if the underlying atomic variables used by
688+ // the queue are lock-free (they should be on most platforms).
689+ // Thread-safe.
690+ static bool is_lock_free ()
691+ {
692+ return
693+ details::static_is_lock_free<bool >::value == 2 &&
694+ details::static_is_lock_free<size_t >::value == 2 &&
695+ details::static_is_lock_free<std::uint32_t >::value == 2 &&
696+ details::static_is_lock_free<index_t >::value == 2 &&
697+ details::static_is_lock_free<void *>::value == 2 ;
698+ }
586699
587700
588701private:
@@ -657,6 +770,7 @@ class ConcurrentQueue
657770 {
658771 FreeList () : freeListHead(nullptr ) { }
659772 FreeList (FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead .store (nullptr , std::memory_order_relaxed); }
773+ void swap (FreeList& other) { details::swap_relaxed (freeListHead, other.freeListHead ); }
660774
661775 FreeList (FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
662776 FreeList& operator =(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
@@ -781,6 +895,23 @@ class ConcurrentQueue
781895 }
782896 }
783897
898+ // Returns true if the block is now empty (does not apply in explicit context)
899+ inline bool set_empty (index_t i)
900+ {
901+ if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
902+ // Set flag
903+ assert (!emptyFlags[BLOCK_SIZE - 1 - static_cast <size_t >(i & static_cast <index_t >(BLOCK_SIZE - 1 ))].load (std::memory_order_relaxed));
904+ emptyFlags[BLOCK_SIZE - 1 - static_cast <size_t >(i & static_cast <index_t >(BLOCK_SIZE - 1 ))].store (true , std::memory_order_release);
905+ return false ;
906+ }
907+ else {
908+ // Increment counter
909+ auto prevVal = elementsCompletelyDequeued.fetch_add (1 , std::memory_order_release);
910+ assert (prevVal < BLOCK_SIZE);
911+ return prevVal == BLOCK_SIZE - 1 ;
912+ }
913+ }
914+
784915 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
785916 // Returns true if the block is now empty (does not apply in explicit context).
786917 inline bool set_many_empty (index_t i, size_t count)
@@ -803,6 +934,20 @@ class ConcurrentQueue
803934 }
804935 }
805936
937+ inline void set_all_empty ()
938+ {
939+ if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
940+ // Set all flags
941+ for (size_t i = 0 ; i != BLOCK_SIZE; ++i) {
942+ emptyFlags[i].store (true , std::memory_order_relaxed);
943+ }
944+ }
945+ else {
946+ // Reset counter
947+ elementsCompletelyDequeued.store (BLOCK_SIZE, std::memory_order_relaxed);
948+ }
949+ }
950+
806951 inline void reset_empty ()
807952 {
808953 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
@@ -1043,6 +1188,68 @@ class ConcurrentQueue
10431188 return this ->tailIndex ;
10441189 }
10451190
1191+ template <typename It>
1192+ size_t dequeue_bulk (It& itemFirst, size_t max)
1193+ {
1194+ auto tail = this ->tailIndex .load (std::memory_order_relaxed);
1195+ auto overcommit = this ->dequeueOvercommit .load (std::memory_order_relaxed);
1196+ auto desiredCount = static_cast <size_t >(tail - (this ->dequeueOptimisticCount .load (std::memory_order_relaxed) - overcommit));
1197+ if (details::circular_less_than<size_t >(0 , desiredCount)) {
1198+ desiredCount = desiredCount < max ? desiredCount : max;
1199+ std::atomic_thread_fence (std::memory_order_acquire);
1200+
1201+ auto myDequeueCount = this ->dequeueOptimisticCount .fetch_add (desiredCount, std::memory_order_relaxed);
1202+ assert (overcommit <= myDequeueCount);
1203+
1204+ tail = this ->tailIndex .load (std::memory_order_acquire);
1205+ auto actualCount = static_cast <size_t >(tail - (myDequeueCount - overcommit));
1206+ if (details::circular_less_than<size_t >(0 , actualCount)) {
1207+ actualCount = desiredCount < actualCount ? desiredCount : actualCount;
1208+ if (actualCount < desiredCount) {
1209+ this ->dequeueOvercommit .fetch_add (desiredCount - actualCount, std::memory_order_release);
1210+ }
1211+
1212+ // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
1213+ // will never exceed tail.
1214+ auto firstIndex = this ->headIndex .fetch_add (actualCount, std::memory_order_acq_rel);
1215+
1216+ // Determine which block the first element is in
1217+ auto localBlockIndex = blockIndex.load (std::memory_order_acquire);
1218+ auto localBlockIndexHead = localBlockIndex->front .load (std::memory_order_acquire);
1219+
1220+ auto headBase = localBlockIndex->entries [localBlockIndexHead].base ;
1221+ auto firstBlockBaseIndex = firstIndex & ~static_cast <index_t >(BLOCK_SIZE - 1 );
1222+ auto offset = static_cast <size_t >(static_cast <typename std::make_signed<index_t >::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
1223+ auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1 );
1224+
1225+ // Iterate the blocks and dequeue
1226+ auto index = firstIndex;
1227+ do {
1228+ auto firstIndexInBlock = index;
1229+ auto endIndex = (index & ~static_cast <index_t >(BLOCK_SIZE - 1 )) + static_cast <index_t >(BLOCK_SIZE);
1230+ endIndex = details::circular_less_than<index_t >(firstIndex + static_cast <index_t >(actualCount), endIndex) ? firstIndex + static_cast <index_t >(actualCount) : endIndex;
1231+ auto block = localBlockIndex->entries [indexIndex].block ;
1232+
1233+ const auto sz = endIndex - index;
1234+ memcpy ( itemFirst, (*block)[index], sizeof ( T ) * sz );
1235+ index += sz;
1236+ itemFirst += sz;
1237+
1238+ block->ConcurrentQueue ::Block::set_many_empty (firstIndexInBlock, static_cast <size_t >(endIndex - firstIndexInBlock));
1239+ indexIndex = (indexIndex + 1 ) & (localBlockIndex->size - 1 );
1240+ } while (index != firstIndex + actualCount);
1241+
1242+ return actualCount;
1243+ }
1244+ else {
1245+ // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1246+ this ->dequeueOvercommit .fetch_add (desiredCount, std::memory_order_release);
1247+ }
1248+ }
1249+
1250+ return 0 ;
1251+ }
1252+
10461253 private:
10471254 struct BlockIndexEntry
10481255 {
@@ -1153,6 +1360,15 @@ class ConcurrentQueue
11531360 freeList.add (block);
11541361 }
11551362
1363+ inline void add_blocks_to_free_list (Block* block)
1364+ {
1365+ while (block != nullptr ) {
1366+ auto next = block->next ;
1367+ add_block_to_free_list (block);
1368+ block = next;
1369+ }
1370+ }
1371+
11561372 inline Block* try_get_block_from_free_list ()
11571373 {
11581374 return freeList.try_get ();
@@ -1177,9 +1393,15 @@ class ConcurrentQueue
11771393
11781394 // ////////////////////////////////
11791395 // Producer list manipulation
1180- // ////////////////////////////////
1396+ // ////////////////////////////////
11811397
1182- ProducerBase* recycle_or_create_producer ()
1398+ ProducerBase* recycle_or_create_producer ()
1399+ {
1400+ bool recycled;
1401+ return recycle_or_create_producer (recycled);
1402+ }
1403+
1404+ ProducerBase* recycle_or_create_producer (bool & recycled)
11831405 {
11841406 // Try to re-use one first
11851407 for (auto ptr = producerListTail.load (std::memory_order_acquire); ptr != nullptr ; ptr = ptr->next_prod ()) {
@@ -1189,12 +1411,14 @@ class ConcurrentQueue
11891411 bool expected = true ;
11901412 if (ptr->inactive .compare_exchange_strong (expected, /* desired */ false , std::memory_order_acquire, std::memory_order_relaxed)) {
11911413 // We caught one! It's been marked as activated, the caller can have it
1414+ recycled = true ;
11921415 return ptr;
11931416 }
11941417 }
11951418 }
11961419 }
11971420
1421+ recycled = false ;
11981422 return add_producer (static_cast <ProducerBase*>(create<ExplicitProducer>(this )));
11991423 }
12001424
@@ -1216,6 +1440,16 @@ class ConcurrentQueue
12161440 return producer;
12171441 }
12181442
1443+ void reown_producers ()
1444+ {
1445+ // After another instance is moved-into/swapped-with this one, all the
1446+ // producers we stole still think their parents are the other queue.
1447+ // So fix them up!
1448+ for (auto ptr = producerListTail.load (std::memory_order_relaxed); ptr != nullptr ; ptr = ptr->next_prod ()) {
1449+ ptr->parent = this ;
1450+ }
1451+ }
1452+
12191453 // ////////////////////////////////
12201454 // Utility functions
12211455 // ////////////////////////////////
@@ -1293,6 +1527,22 @@ ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
12931527 lastKnownGlobalOffset = static_cast <std::uint32_t >(-1 );
12941528}
12951529
1530+ template <typename T, typename Traits>
1531+ inline void swap (ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
1532+ {
1533+ a.swap (b);
1534+ }
1535+
1536+ inline void swap (ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT
1537+ {
1538+ a.swap (b);
1539+ }
1540+
1541+ inline void swap (ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT
1542+ {
1543+ a.swap (b);
1544+ }
1545+
12961546}
12971547
12981548} /* namespace tracy */
0 commit comments