2024-08-16 23:33:48 +08:00
/// \file
/// \brief \b [Internal] Passes queued data between threads using a circular buffer with read and write pointers
///
/// This file is part of RakNet Copyright 2003 Kevin Jenkins.
///
/// Usage of RakNet is subject to the appropriate license agreement.
/// Creative Commons Licensees are subject to the
/// license found at
/// http://creativecommons.org/licenses/by-nc/2.5/
/// Single application licensees are subject to the license found at
/// http://www.rakkarsoft.com/SingleApplicationLicense.html
/// Custom license users are subject to the terms therein.
/// GPL license users are subject to the GNU General Public
/// License as published by the Free
/// Software Foundation; either version 2 of the License, or (at your
/// option) any later version.
2024-02-13 23:06:55 +08:00
# ifndef __SINGLE_PRODUCER_CONSUMER_H
# define __SINGLE_PRODUCER_CONSUMER_H
# include <assert.h>
static const int MINIMUM_LIST_SIZE = 8 ;
# include "Export.h"
/// The namespace DataStructures was only added to avoid compiler errors for commonly named data structures
/// As these data structures are stand-alone, you can use them outside of RakNet for your own projects if you wish.
namespace DataStructures
{
/// \brief A single producer consumer implementation without critical sections.
template < class SingleProducerConsumerType >
class RAK_DLL_EXPORT SingleProducerConsumer
{
public :
/// Constructor
SingleProducerConsumer ( ) ;
2024-08-16 23:33:48 +08:00
/// Destructor
~ SingleProducerConsumer ( ) ;
/// WriteLock must be immediately followed by WriteUnlock. These two functions must be called in the same thread.
/// \return A pointer to a block of data you can write to.
SingleProducerConsumerType * WriteLock ( void ) ;
/// Call if you don't want to write to a block of data from WriteLock() after all.
/// Cancelling locks cancels all locks back up to the data passed. So if you lock twice and cancel using the first lock, the second lock is ignored
/// \param[in] cancelToLocation Which WriteLock() to cancel.
void CancelWriteLock ( SingleProducerConsumerType * cancelToLocation ) ;
/// Call when you are done writing to a block of memory returned by WriteLock()
void WriteUnlock ( void ) ;
2024-02-13 23:06:55 +08:00
/// ReadLock must be immediately followed by ReadUnlock. These two functions must be called in the same thread.
/// \retval 0 No data is availble to read
/// \retval Non-zero The data previously written to, in another thread, by WriteLock followed by WriteUnlock.
SingleProducerConsumerType * ReadLock ( void ) ;
2024-08-16 23:33:48 +08:00
// Cancelling locks cancels all locks back up to the data passed. So if you lock twice and cancel using the first lock, the second lock is ignored
/// param[in] Which ReadLock() to cancel.
void CancelReadLock ( SingleProducerConsumerType * cancelToLocation ) ;
2024-02-13 23:06:55 +08:00
/// Signals that we are done reading the the data from the least recent call of ReadLock.
/// At this point that pointer is no longer valid, and should no longer be read.
void ReadUnlock ( void ) ;
2024-08-16 23:33:48 +08:00
/// Clear is not thread-safe and none of the lock or unlock functions should be called while it is running.
void Clear ( void ) ;
/// This function will estimate how many elements are waiting to be read. It's threadsafe enough that the value returned is stable, but not threadsafe enough to give accurate results.
/// \return An ESTIMATE of how many data elements are waiting to be read
int Size ( void ) const ;
/// Make sure that the pointer we done reading for the call to ReadUnlock is the right pointer.
/// param[in] A previous pointer returned by ReadLock()
bool CheckReadUnlockOrder ( const SingleProducerConsumerType * data ) const ;
/// Returns if ReadUnlock was called before ReadLock
/// \return If the read is locked
bool ReadIsLocked ( void ) const ;
2024-02-13 23:06:55 +08:00
private :
struct DataPlusPtr
{
SingleProducerConsumerType object ;
// Ready to read is so we can use an equality boolean comparison, in case the writePointer var is trashed while context switching.
volatile bool readyToRead ;
volatile DataPlusPtr * next ;
} ;
volatile DataPlusPtr * readAheadPointer ;
volatile DataPlusPtr * writeAheadPointer ;
volatile DataPlusPtr * readPointer ;
volatile DataPlusPtr * writePointer ;
unsigned readCount , writeCount ;
} ;
template < class SingleProducerConsumerType >
SingleProducerConsumer < SingleProducerConsumerType > : : SingleProducerConsumer ( )
{
// Preallocate
readPointer = new DataPlusPtr ;
writePointer = readPointer ;
readPointer - > next = new DataPlusPtr ;
int listSize ;
# ifdef _DEBUG
assert ( MINIMUM_LIST_SIZE > = 3 ) ;
# endif
for ( listSize = 2 ; listSize < MINIMUM_LIST_SIZE ; listSize + + )
{
readPointer = readPointer - > next ;
readPointer - > next = new DataPlusPtr ;
}
readPointer - > next - > next = writePointer ; // last to next = start
readPointer = writePointer ;
readAheadPointer = readPointer ;
writeAheadPointer = writePointer ;
readCount = writeCount = 0 ;
}
template < class SingleProducerConsumerType >
2024-08-16 23:33:48 +08:00
SingleProducerConsumer < SingleProducerConsumerType > : : ~ SingleProducerConsumer ( )
{
volatile DataPlusPtr * next ;
readPointer = writeAheadPointer - > next ;
while ( readPointer ! = writeAheadPointer )
{
next = readPointer - > next ;
delete ( char * ) readPointer ;
readPointer = next ;
}
delete ( char * ) readPointer ;
}
template < class SingleProducerConsumerType >
SingleProducerConsumerType * SingleProducerConsumer < SingleProducerConsumerType > : : WriteLock ( void )
2024-02-13 23:06:55 +08:00
{
2024-08-16 23:33:48 +08:00
if ( writeAheadPointer - > next = = readPointer | |
writeAheadPointer - > next - > readyToRead = = true )
2024-02-13 23:06:55 +08:00
{
2024-08-16 23:33:48 +08:00
volatile DataPlusPtr * originalNext = writeAheadPointer - > next ;
writeAheadPointer - > next = new DataPlusPtr ;
assert ( writeAheadPointer - > next ) ;
writeAheadPointer - > next - > next = originalNext ;
2024-02-13 23:06:55 +08:00
}
volatile DataPlusPtr * last ;
2024-08-16 23:33:48 +08:00
last = writeAheadPointer ;
writeAheadPointer = writeAheadPointer - > next ;
return ( SingleProducerConsumerType * ) last ;
}
template < class SingleProducerConsumerType >
void SingleProducerConsumer < SingleProducerConsumerType > : : CancelWriteLock ( SingleProducerConsumerType * cancelToLocation )
{
writeAheadPointer = ( DataPlusPtr * ) cancelToLocation ;
}
template < class SingleProducerConsumerType >
void SingleProducerConsumer < SingleProducerConsumerType > : : WriteUnlock ( void )
{
// DataPlusPtr *dataContainer = (DataPlusPtr *)structure;
# ifdef _DEBUG
assert ( writePointer - > next ! = readPointer ) ;
assert ( writePointer ! = writeAheadPointer ) ;
# endif
writeCount + + ;
// User is done with the data, allow send by updating the write pointer
writePointer - > readyToRead = true ;
writePointer = writePointer - > next ;
}
template < class SingleProducerConsumerType >
SingleProducerConsumerType * SingleProducerConsumer < SingleProducerConsumerType > : : ReadLock ( void )
{
if ( readAheadPointer = = writePointer | |
readAheadPointer - > readyToRead = = false )
{
return 0 ;
}
volatile DataPlusPtr * last ;
last = readAheadPointer ;
readAheadPointer = readAheadPointer - > next ;
return ( SingleProducerConsumerType * ) last ;
}
template < class SingleProducerConsumerType >
void SingleProducerConsumer < SingleProducerConsumerType > : : CancelReadLock ( SingleProducerConsumerType * cancelToLocation )
{
# ifdef _DEBUG
assert ( readPointer ! = writePointer ) ;
# endif
readAheadPointer = ( DataPlusPtr * ) cancelToLocation ;
2024-02-13 23:06:55 +08:00
}
template < class SingleProducerConsumerType >
void SingleProducerConsumer < SingleProducerConsumerType > : : ReadUnlock ( void )
{
# ifdef _DEBUG
assert ( readAheadPointer ! = readPointer ) ; // If hits, then called ReadUnlock before ReadLock
assert ( readPointer ! = writePointer ) ; // If hits, then called ReadUnlock when Read returns 0
# endif
readCount + + ;
// Allow writes to this memory block
readPointer - > readyToRead = false ;
readPointer = readPointer - > next ;
}
2024-08-16 23:33:48 +08:00
template < class SingleProducerConsumerType >
void SingleProducerConsumer < SingleProducerConsumerType > : : Clear ( void )
{
// Shrink the list down to MINIMUM_LIST_SIZE elements
volatile DataPlusPtr * next ;
writePointer = readPointer - > next ;
int listSize = 1 ;
next = readPointer - > next ;
while ( next ! = readPointer )
{
listSize + + ;
next = next - > next ;
}
while ( listSize - - > MINIMUM_LIST_SIZE )
{
next = writePointer - > next ;
# ifdef _DEBUG
assert ( writePointer ! = readPointer ) ;
# endif
delete ( char * ) writePointer ;
writePointer = next ;
}
readPointer - > next = writePointer ;
writePointer = readPointer ;
readAheadPointer = readPointer ;
writeAheadPointer = writePointer ;
readCount = writeCount = 0 ;
}
template < class SingleProducerConsumerType >
int SingleProducerConsumer < SingleProducerConsumerType > : : Size ( void ) const
{
return writeCount - readCount ;
}
template < class SingleProducerConsumerType >
bool SingleProducerConsumer < SingleProducerConsumerType > : : CheckReadUnlockOrder ( const SingleProducerConsumerType * data ) const
{
return const_cast < const SingleProducerConsumerType * > ( & readPointer - > object ) = = data ;
}
2024-02-13 23:06:55 +08:00
2024-08-16 23:33:48 +08:00
template < class SingleProducerConsumerType >
bool SingleProducerConsumer < SingleProducerConsumerType > : : ReadIsLocked ( void ) const
{
return readAheadPointer ! = readPointer ;
}
2024-02-13 23:06:55 +08:00
}
# endif
2024-08-16 23:33:48 +08:00
/*
# include "SingleProducerConsumer.h"
# include <process.h>
# include <assert.h>
# include <stdio.h>
# include <windows.h>
# include <math.h>
# include <stdlib.h>
# define READ_COUNT_ITERATIONS 10000000
DataStructures : : SingleProducerConsumer < unsigned long > spc ;
unsigned long readCount ;
unsigned __stdcall ProducerThread ( LPVOID arguments )
{
unsigned long producerCount ;
unsigned long * writeBlock ;
producerCount = 0 ;
while ( readCount < READ_COUNT_ITERATIONS )
{
writeBlock = spc . WriteLock ( ) ;
* writeBlock = producerCount ;
spc . WriteUnlock ( ) ;
producerCount + + ;
if ( ( producerCount % 1000000 ) = = 0 )
{
printf ( " WriteCount: %i. BufferSize=%i \n " , producerCount , spc . Size ( ) ) ;
}
}
printf ( " PRODUCER THREAD ENDED! \n " ) ;
return 0 ;
}
unsigned __stdcall ConsumerThread ( LPVOID arguments )
{
unsigned long * readBlock ;
while ( readCount < READ_COUNT_ITERATIONS )
{
if ( ( readBlock = spc . ReadLock ( ) ) ! = 0 )
{
if ( * readBlock ! = readCount )
{
printf ( " Test failed! Expected %i got %i! \n " , readCount , * readBlock ) ;
readCount = READ_COUNT_ITERATIONS ;
assert ( 0 ) ;
}
spc . ReadUnlock ( ) ;
readCount + + ;
if ( ( readCount % 1000000 ) = = 0 )
{
printf ( " ReadCount: %i. BufferSize=%i \n " , readCount , spc . Size ( ) ) ;
}
}
}
printf ( " CONSUMER THREAD ENDED! \n " ) ;
return 0 ;
}
void main ( void )
{
readCount = 0 ;
unsigned threadId1 = 0 ;
unsigned threadId2 = 0 ;
HANDLE thread1Handle , thread2Handle ;
unsigned long startTime = timeGetTime ( ) ;
thread1Handle = ( HANDLE ) _beginthreadex ( NULL , 0 , ProducerThread , 0 , 0 , & threadId1 ) ;
thread2Handle = ( HANDLE ) _beginthreadex ( NULL , 0 , ConsumerThread , 0 , 0 , & threadId1 ) ;
while ( readCount < READ_COUNT_ITERATIONS )
{
Sleep ( 0 ) ;
}
char str [ 256 ] ;
printf ( " Elapsed time = %i milliseconds. Press Enter to continue \n " , timeGetTime ( ) - startTime ) ;
gets ( str ) ;
}
*/