Index: libs/libmythtv/DeviceReadBuffer.h =================================================================== --- libs/libmythtv/DeviceReadBuffer.h (revision 18515) +++ libs/libmythtv/DeviceReadBuffer.h (working copy) @@ -59,7 +59,7 @@ void IncrReadPointer(uint len); bool HandlePausing(void); - bool Poll(void) const; + bool Poll(void); uint WaitForUnused(uint bytes_needed) const; uint WaitForUsed (uint bytes_needed) const; Index: libs/libmythtv/DeviceReadBuffer.cpp =================================================================== --- libs/libmythtv/DeviceReadBuffer.cpp (revision 18515) +++ libs/libmythtv/DeviceReadBuffer.cpp (working copy) @@ -90,9 +90,14 @@ void DeviceReadBuffer::Start(void) { - lock.lock(); - bool was_running = running; - lock.unlock(); + bool was_running; + + { + QMutexLocker locker(&lock); + was_running = running; + error = false; + } + if (was_running) { VERBOSE(VB_IMPORTANT, LOC_ERR + "Start(): Already running."); @@ -104,6 +109,8 @@ { VERBOSE(VB_IMPORTANT, LOC_ERR + QString("Start(): pthread_create failed.") + ENO); + + QMutexLocker locker(&lock); error = true; } } @@ -118,21 +125,25 @@ used = 0; readPtr = buffer; writePtr = buffer; + + error = false; } void DeviceReadBuffer::Stop(void) { bool was_running = IsRunning(); - lock.lock(); - run = false; - lock.unlock(); if (!was_running) { - VERBOSE(VB_IMPORTANT, LOC_ERR + "Stop(): Not running."); + VERBOSE(VB_IMPORTANT, LOC + "Stop(): Not running."); return; } + { + QMutexLocker locker(&lock); + run = false; + } + pthread_join(thread, NULL); } @@ -229,10 +240,11 @@ { uint errcnt = 0; - lock.lock(); - run = true; - running = true; - lock.unlock(); + { + QMutexLocker locker(&lock); + run = true; + running = true; + } while (run) { @@ -248,6 +260,15 @@ if (using_poll && !Poll()) continue; + { + QMutexLocker locker(&lock); + if (error) + { + VERBOSE(VB_RECORD, LOC + "fill_ringbuffer: error state"); + break; + } + } + // Limit read size for faster return from read size_t read_size = min(dev_read_size, (size_t) WaitForUnused(TSPacket::SIZE)); @@ -268,9 +289,10 @@ } } - lock.lock(); - running = false; - lock.unlock(); + { + QMutexLocker locker(&lock); + running = false; + } } bool DeviceReadBuffer::HandlePausing(void) @@ -293,7 +315,7 @@ return true; } -bool DeviceReadBuffer::Poll(void) const +bool DeviceReadBuffer::Poll(void) { #ifdef USING_MINGW #warning mingw DeviceReadBuffer::Poll @@ -302,31 +324,53 @@ return false; #else bool retval = true; - while (true) + uint timeout_cnt = 0; + + for (;;) { struct pollfd polls; polls.fd = _stream_fd; polls.events = POLLIN; polls.revents = 0; - int ret = poll(&polls, 1 /*number of polls*/, 10 /*msec*/); - if (IsPauseRequested() || !IsOpen() || !run) + int ret = poll(&polls, 1 /*number of polls*/, 250 /*msec*/); + + if (polls.revents & (POLLERR | POLLHUP | POLLNVAL)) { + VERBOSE(VB_IMPORTANT, LOC + "poll error"); + error = true; + return true; + } + + if (!run || !IsOpen() || IsPauseRequested()) + { retval = false; break; // are we supposed to pause, stop, etc. } if (ret > 0) break; // we have data to read :) - if ((-1 == ret) && (EOVERFLOW == errno)) - break; // we have an error to handle - - if ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))) - continue; // errors that tell you to try again - if (ret == 0) - continue; // timed out, try again - - usleep(2500); + if (ret < 0) + { + if ((EOVERFLOW == errno)) + break; // we have an error to handle + if ((EAGAIN == errno) || (EINTR == errno)) + continue; // errors that tell you to try again + usleep(2500); + } + else // ret == 0 + { + if (++timeout_cnt > 9) + { + VERBOSE(VB_RECORD, LOC_ERR + "Poll giving up"); + QMutexLocker locker(&lock); + error = true; + return true; + } + if (timeout_cnt % 2) + VERBOSE(VB_RECORD, LOC_ERR + QString("Poll timeout (%1)") + .arg(timeout_cnt)); + } } return retval; #endif //!USING_MINGW @@ -360,9 +404,9 @@ if (++errcnt > 5) { - lock.lock(); + QMutexLocker locker(&lock); + VERBOSE(VB_RECORD, LOC + "Too many errors."); error = true; - lock.unlock(); return false; } @@ -376,10 +420,8 @@ VERBOSE(VB_IMPORTANT, LOC + QString("End-Of-File? fd(%1)").arg(_stream_fd)); - lock.lock(); + QMutexLocker locker(&lock); eof = true; - lock.unlock(); - return false; } usleep(500);