Clean up POSIX streaming code

This commit is contained in:
erorcun 2021-01-27 18:26:08 +03:00
parent e8a10748a4
commit ab939e059b
6 changed files with 92 additions and 47 deletions

View File

@ -43,6 +43,6 @@ char *CdStreamGetImageName(int32 cd);
void CdStreamRemoveImages(void); void CdStreamRemoveImages(void);
int32 CdStreamGetNumImages(void); int32 CdStreamGetNumImages(void);
#ifndef _WIN32 #ifdef FLUSHABLE_STREAMING
extern bool flushStream[MAX_CDCHANNELS]; extern bool flushStream[MAX_CDCHANNELS];
#endif #endif

View File

@ -21,9 +21,9 @@
#define CDDEBUG(f, ...) debug ("%s: " f "\n", "cdvd_stream", ## __VA_ARGS__) #define CDDEBUG(f, ...) debug ("%s: " f "\n", "cdvd_stream", ## __VA_ARGS__)
#define CDTRACE(f, ...) printf("%s: " f "\n", "cdvd_stream", ## __VA_ARGS__) #define CDTRACE(f, ...) printf("%s: " f "\n", "cdvd_stream", ## __VA_ARGS__)
// #define ONE_THREAD_PER_CHANNEL // Don't use if you're not on SSD/Flash. (Also you may want to benefit from this via using all channels in Streaming.cpp) #ifdef FLUSHABLE_STREAMING
bool flushStream[MAX_CDCHANNELS]; bool flushStream[MAX_CDCHANNELS];
#endif
struct CdReadInfo struct CdReadInfo
{ {
@ -99,6 +99,7 @@ CdStreamInitThread(void)
ASSERT(0); ASSERT(0);
return; return;
} }
#ifdef ONE_THREAD_PER_CHANNEL #ifdef ONE_THREAD_PER_CHANNEL
sprintf(semName,"/semaphore_start%d",i); sprintf(semName,"/semaphore_start%d",i);
gpReadInfo[i].pStartSemaphore = sem_open(semName, O_CREAT, 0644, 1); gpReadInfo[i].pStartSemaphore = sem_open(semName, O_CREAT, 0644, 1);
@ -245,10 +246,12 @@ CdStreamRead(int32 channel, void *buffer, uint32 offset, uint32 size)
if ( pChannel->nSectorsToRead != 0 || pChannel->bReading ) { if ( pChannel->nSectorsToRead != 0 || pChannel->bReading ) {
if (pChannel->hFile == hImage - 1 && pChannel->nSectorOffset == _GET_OFFSET(offset) && pChannel->nSectorsToRead >= size) if (pChannel->hFile == hImage - 1 && pChannel->nSectorOffset == _GET_OFFSET(offset) && pChannel->nSectorsToRead >= size)
return STREAM_SUCCESS; return STREAM_SUCCESS;
#ifdef FLUSHABLE_STREAMING
flushStream[channel] = 1; flushStream[channel] = 1;
CdStreamSync(channel); CdStreamSync(channel);
//return STREAM_NONE; #else
return STREAM_NONE;
#endif
} }
pChannel->hFile = hImage - 1; pChannel->hFile = hImage - 1;
@ -316,34 +319,34 @@ CdStreamSync(int32 channel)
CdReadInfo *pChannel = &gpReadInfo[channel]; CdReadInfo *pChannel = &gpReadInfo[channel];
ASSERT( pChannel != nil ); ASSERT( pChannel != nil );
#ifdef FLUSHABLE_STREAMING
if (flushStream[channel]) { if (flushStream[channel]) {
#ifdef ONE_THREAD_PER_CHANNEL
pChannel->nSectorsToRead = 0; pChannel->nSectorsToRead = 0;
#ifdef ONE_THREAD_PER_CHANNEL
pthread_kill(pChannel->pChannelThread, SIGUSR1); pthread_kill(pChannel->pChannelThread, SIGUSR1);
if (pChannel->bReading) { if (pChannel->bReading) {
pChannel->bLocked = true; pChannel->bLocked = true;
while (pChannel->bLocked)
sem_wait(pChannel->pDoneSemaphore);
}
#else #else
pChannel->nSectorsToRead = 0;
if (pChannel->bReading) { if (pChannel->bReading) {
pChannel->bLocked = true; pChannel->bLocked = true;
pthread_kill(_gCdStreamThread, SIGUSR1); pthread_kill(_gCdStreamThread, SIGUSR1);
#endif
while (pChannel->bLocked) while (pChannel->bLocked)
sem_wait(pChannel->pDoneSemaphore); sem_wait(pChannel->pDoneSemaphore);
} }
#endif
pChannel->bReading = false; pChannel->bReading = false;
flushStream[channel] = false; flushStream[channel] = false;
return STREAM_NONE; return STREAM_NONE;
} }
#endif
if ( pChannel->nSectorsToRead != 0 ) if ( pChannel->nSectorsToRead != 0 )
{ {
pChannel->bLocked = true; pChannel->bLocked = true;
while (pChannel->bLocked) while (pChannel->bLocked && pChannel->nSectorsToRead != 0){
sem_wait(pChannel->pDoneSemaphore); sem_wait(pChannel->pDoneSemaphore);
}
pChannel->bLocked = false;
} }
pChannel->bReading = false; pChannel->bReading = false;
@ -447,7 +450,7 @@ void *CdStreamThread(void *param)
if ( pChannel->bLocked ) if ( pChannel->bLocked )
{ {
pChannel->bLocked = 0; pChannel->bLocked = 0;
sem_post(pChannel->pDoneSemaphore); sem_post(pChannel->pDoneSemaphore);
} }
pChannel->bReading = false; pChannel->bReading = false;
} }
@ -524,7 +527,9 @@ void
CdStreamRemoveImages(void) CdStreamRemoveImages(void)
{ {
for ( int32 i = 0; i < gNumChannels; i++ ) { for ( int32 i = 0; i < gNumChannels; i++ ) {
#ifdef FLUSHABLE_STREAMING
flushStream[i] = 1; flushStream[i] = 1;
#endif
CdStreamSync(i); CdStreamSync(i);
} }

View File

@ -207,11 +207,15 @@ CStreaming::Init2(void)
// allocate streaming buffers // allocate streaming buffers
if(ms_streamingBufferSize & 1) ms_streamingBufferSize++; if(ms_streamingBufferSize & 1) ms_streamingBufferSize++;
#ifndef ONE_THREAD_PER_CHANNEL
ms_pStreamingBuffer[0] = (int8*)RwMallocAlign(ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE, CDSTREAM_SECTOR_SIZE); ms_pStreamingBuffer[0] = (int8*)RwMallocAlign(ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE, CDSTREAM_SECTOR_SIZE);
ms_streamingBufferSize /= 2; ms_streamingBufferSize /= 2;
ms_pStreamingBuffer[1] = ms_pStreamingBuffer[0] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE; ms_pStreamingBuffer[1] = ms_pStreamingBuffer[0] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE;
#ifdef ONE_THREAD_PER_CHANNEL #else
ms_pStreamingBuffer[2] = (int8*)RwMallocAlign(ms_streamingBufferSize*2*CDSTREAM_SECTOR_SIZE, CDSTREAM_SECTOR_SIZE); ms_pStreamingBuffer[0] = (int8*)RwMallocAlign(ms_streamingBufferSize*2*CDSTREAM_SECTOR_SIZE, CDSTREAM_SECTOR_SIZE);
ms_streamingBufferSize /= 2;
ms_pStreamingBuffer[1] = ms_pStreamingBuffer[0] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE;
ms_pStreamingBuffer[2] = ms_pStreamingBuffer[1] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE;
ms_pStreamingBuffer[3] = ms_pStreamingBuffer[2] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE; ms_pStreamingBuffer[3] = ms_pStreamingBuffer[2] + ms_streamingBufferSize*CDSTREAM_SECTOR_SIZE;
#endif #endif
debug("Streaming buffer size is %d sectors", ms_streamingBufferSize); debug("Streaming buffer size is %d sectors", ms_streamingBufferSize);
@ -2305,9 +2309,10 @@ CStreaming::LoadRequestedModels(void)
} }
// Let's load models first, then process it. Unfortunately processing models are still single-threaded. // Let's load models in 4 threads; when one of them becomes idle, process the file, and fill thread with another file. Unfortunately processing models are still single-threaded.
// Currently only supported on POSIX streamer. // Currently only supported on POSIX streamer.
#ifdef ONE_THREAD_PER_CHANNEL // WIP - some files are loaded swapped (CdStreamPosix problem?)
#if 0 //def ONE_THREAD_PER_CHANNEL
void void
CStreaming::LoadAllRequestedModels(bool priority) CStreaming::LoadAllRequestedModels(bool priority)
{ {
@ -2326,14 +2331,18 @@ CStreaming::LoadAllRequestedModels(bool priority)
int streamIds[ARRAY_SIZE(ms_pStreamingBuffer)]; int streamIds[ARRAY_SIZE(ms_pStreamingBuffer)];
int streamSizes[ARRAY_SIZE(ms_pStreamingBuffer)]; int streamSizes[ARRAY_SIZE(ms_pStreamingBuffer)];
int streamPoses[ARRAY_SIZE(ms_pStreamingBuffer)]; int streamPoses[ARRAY_SIZE(ms_pStreamingBuffer)];
bool first = true; int readOrder[4] = {-1}; // Channel IDs ordered by read time
int readI = 0;
int processI = 0; int processI = 0;
bool first = true;
// All those "first" checks are because of variables aren't initialized in first pass.
while (true) { while (true) {
// Enumerate files and start reading
for (int i=0; i<ARRAY_SIZE(ms_pStreamingBuffer); i++) { for (int i=0; i<ARRAY_SIZE(ms_pStreamingBuffer); i++) {
// Channel has file to load
if (!first && streamIds[i] != -1) { if (!first && streamIds[i] != -1) {
processI = i;
continue; continue;
} }
@ -2346,12 +2355,16 @@ CStreaming::LoadAllRequestedModels(bool priority)
if (ms_aInfoForModel[streamId].GetCdPosnAndSize(posn, size)) { if (ms_aInfoForModel[streamId].GetCdPosnAndSize(posn, size)) {
streamIds[i] = -1; streamIds[i] = -1;
// Big file, needs 2 buffer
if (size > (uint32)ms_streamingBufferSize) { if (size > (uint32)ms_streamingBufferSize) {
if (i + 1 == ARRAY_SIZE(ms_pStreamingBuffer)) if (i + 1 == ARRAY_SIZE(ms_pStreamingBuffer))
continue; break;
else if (!first && streamIds[i+1] != -1) else if (!first && streamIds[i+1] != -1)
continue; continue;
} else { } else {
// Buffer of current channel is part of a "big file", pass
if (i != 0 && streamIds[i-1] != -1 && streamSizes[i-1] > (uint32)ms_streamingBufferSize) if (i != 0 && streamIds[i-1] != -1 && streamSizes[i-1] > (uint32)ms_streamingBufferSize)
continue; continue;
} }
@ -2361,8 +2374,18 @@ CStreaming::LoadAllRequestedModels(bool priority)
streamIds[i] = streamId; streamIds[i] = streamId;
streamSizes[i] = size; streamSizes[i] = size;
streamPoses[i] = posn; streamPoses[i] = posn;
if (!first)
assert(readOrder[readI] == -1);
//printf("read: order %d, ch %d, id %d, size %d\n", readI, i, streamId, size);
CdStreamRead(i, ms_pStreamingBuffer[i], imgOffset+posn, size); CdStreamRead(i, ms_pStreamingBuffer[i], imgOffset+posn, size);
processI = i; readOrder[readI] = i;
if (first && readI+1 != ARRAY_SIZE(readOrder))
readOrder[readI+1] = -1;
readI = (readI + 1) % ARRAY_SIZE(readOrder);
} else { } else {
ms_aInfoForModel[streamId].RemoveFromList(); ms_aInfoForModel[streamId].RemoveFromList();
DecrementRef(streamId); DecrementRef(streamId);
@ -2370,33 +2393,40 @@ CStreaming::LoadAllRequestedModels(bool priority)
ms_aInfoForModel[streamId].m_loadState = STREAMSTATE_LOADED; ms_aInfoForModel[streamId].m_loadState = STREAMSTATE_LOADED;
streamIds[i] = -1; streamIds[i] = -1;
} }
} else } else {
streamIds[i] = -1; streamIds[i] = -1;
break;
}
} }
first = false; first = false;
int nextChannel = readOrder[processI];
// Now process // Now start processing
if (streamIds[processI] == -1) if (nextChannel == -1 || streamIds[nextChannel] == -1)
break; break;
// Try again on error //printf("process: order %d, ch %d, id %d\n", processI, nextChannel, streamIds[nextChannel]);
while (CdStreamSync(processI) != STREAM_NONE) {
CdStreamRead(processI, ms_pStreamingBuffer[processI], imgOffset+streamPoses[processI], streamSizes[processI]);
}
ms_aInfoForModel[streamIds[processI]].m_loadState = STREAMSTATE_READING;
MakeSpaceFor(streamSizes[processI] * CDSTREAM_SECTOR_SIZE);
ConvertBufferToObject(ms_pStreamingBuffer[processI], streamIds[processI]);
if(ms_aInfoForModel[streamIds[processI]].m_loadState == STREAMSTATE_STARTED)
FinishLoadingLargeFile(ms_pStreamingBuffer[processI], streamIds[processI]);
if(streamIds[processI] < STREAM_OFFSET_TXD){ // Try again on error
CSimpleModelInfo *mi = (CSimpleModelInfo*)CModelInfo::GetModelInfo(streamIds[processI]); while (CdStreamSync(nextChannel) != STREAM_NONE) {
CdStreamRead(nextChannel, ms_pStreamingBuffer[nextChannel], imgOffset+streamPoses[nextChannel], streamSizes[nextChannel]);
}
ms_aInfoForModel[streamIds[nextChannel]].m_loadState = STREAMSTATE_READING;
MakeSpaceFor(streamSizes[nextChannel] * CDSTREAM_SECTOR_SIZE);
ConvertBufferToObject(ms_pStreamingBuffer[nextChannel], streamIds[nextChannel]);
if(ms_aInfoForModel[streamIds[nextChannel]].m_loadState == STREAMSTATE_STARTED)
FinishLoadingLargeFile(ms_pStreamingBuffer[nextChannel], streamIds[nextChannel]);
if(streamIds[nextChannel] < STREAM_OFFSET_TXD){
CSimpleModelInfo *mi = (CSimpleModelInfo*)CModelInfo::GetModelInfo(streamIds[nextChannel]);
if(mi->IsSimple()) if(mi->IsSimple())
mi->m_alpha = 255; mi->m_alpha = 255;
} }
streamIds[processI] = -1; streamIds[nextChannel] = -1;
readOrder[processI] = -1;
processI = (processI + 1) % ARRAY_SIZE(readOrder);
} }
ms_bLoadingBigModel = false; ms_bLoadingBigModel = false;
@ -2443,7 +2473,7 @@ CStreaming::LoadAllRequestedModels(bool priority)
status = CdStreamRead(0, ms_pStreamingBuffer[0], imgOffset+posn, size); status = CdStreamRead(0, ms_pStreamingBuffer[0], imgOffset+posn, size);
while(CdStreamSync(0) || status == STREAM_NONE); while(CdStreamSync(0) || status == STREAM_NONE);
ms_aInfoForModel[streamId].m_loadState = STREAMSTATE_READING; ms_aInfoForModel[streamId].m_loadState = STREAMSTATE_READING;
MakeSpaceFor(size * CDSTREAM_SECTOR_SIZE); MakeSpaceFor(size * CDSTREAM_SECTOR_SIZE);
ConvertBufferToObject(ms_pStreamingBuffer[0], streamId); ConvertBufferToObject(ms_pStreamingBuffer[0], streamId);
if(ms_aInfoForModel[streamId].m_loadState == STREAMSTATE_STARTED) if(ms_aInfoForModel[streamId].m_loadState == STREAMSTATE_STARTED)
@ -2500,7 +2530,7 @@ CStreaming::FlushRequestList(void)
next = si->m_next; next = si->m_next;
RemoveModel(si - ms_aInfoForModel); RemoveModel(si - ms_aInfoForModel);
} }
#ifndef _WIN32 #ifdef FLUSHABLE_STREAMING
if(ms_channel[0].state == CHANNELSTATE_READING) { if(ms_channel[0].state == CHANNELSTATE_READING) {
flushStream[0] = 1; flushStream[0] = 1;
} }
@ -3216,4 +3246,4 @@ CStreaming::PrintStreamingBufferState()
DoRWStuffEndOfFrame(); DoRWStuffEndOfFrame();
} }
CTimer::Update(); CTimer::Update();
} }

View File

@ -88,7 +88,11 @@ public:
static int32 ms_oldSectorX; static int32 ms_oldSectorX;
static int32 ms_oldSectorY; static int32 ms_oldSectorY;
static int32 ms_streamingBufferSize; static int32 ms_streamingBufferSize;
#ifndef ONE_THREAD_PER_CHANNEL
static int8 *ms_pStreamingBuffer[2]; static int8 *ms_pStreamingBuffer[2];
#else
static int8 *ms_pStreamingBuffer[4];
#endif
static size_t ms_memoryUsed; static size_t ms_memoryUsed;
static CStreamingChannel ms_channel[2]; static CStreamingChannel ms_channel[2];
static int32 ms_channelError; static int32 ms_channelError;

View File

@ -393,11 +393,12 @@ static_assert(false, "SUPPORT_XBOX_SCRIPT and SUPPORT_MOBILE_SCRIPT are mutually
#endif #endif
#ifdef LIBRW // Streaming
// these are not supported with librw yet #if !defined(_WIN32) && !defined(__SWITCH__)
//#define ONE_THREAD_PER_CHANNEL // Don't use if you're not on SSD/Flash - also not utilized too much right now(see commented LoadAllRequestedModels in Streaming.cpp)
#define FLUSHABLE_STREAMING // Make it possible to interrupt reading when processing file isn't needed anymore.
#endif #endif
// IMG #define BIG_IMG // Not complete - allows to read larger img files
#define BIG_IMG // allows to read larger img files
//#define SQUEEZE_PERFORMANCE //#define SQUEEZE_PERFORMANCE
#ifdef SQUEEZE_PERFORMANCE #ifdef SQUEEZE_PERFORMANCE
@ -405,6 +406,8 @@ static_assert(false, "SUPPORT_XBOX_SCRIPT and SUPPORT_MOBILE_SCRIPT are mutually
#undef NO_ISLAND_LOADING #undef NO_ISLAND_LOADING
#endif #endif
// -------
#if defined __MWERKS__ || defined VANILLA_DEFINES #if defined __MWERKS__ || defined VANILLA_DEFINES
#define FINAL #define FINAL
#undef CHATTYSPLASH #undef CHATTYSPLASH

View File

@ -1271,10 +1271,11 @@ void terminateHandler(int sig, siginfo_t *info, void *ucontext) {
RsGlobal.quit = TRUE; RsGlobal.quit = TRUE;
} }
#ifdef FLUSHABLE_STREAMING
void dummyHandler(int sig){ void dummyHandler(int sig){
// Don't kill the app pls // Don't kill the app pls
} }
#endif
#endif #endif
void resizeCB(GLFWwindow* window, int width, int height) { void resizeCB(GLFWwindow* window, int width, int height) {
@ -1528,11 +1529,13 @@ main(int argc, char *argv[])
act.sa_sigaction = terminateHandler; act.sa_sigaction = terminateHandler;
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
#ifdef FLUSHABLE_STREAMING
struct sigaction sa; struct sigaction sa;
sigemptyset(&sa.sa_mask); sigemptyset(&sa.sa_mask);
sa.sa_handler = dummyHandler; sa.sa_handler = dummyHandler;
sa.sa_flags = 0; sa.sa_flags = 0;
sigaction(SIGUSR1, &sa, NULL); // Needed for CdStreamPosix sigaction(SIGUSR1, &sa, NULL);
#endif
#endif #endif
/* /*