timermanager: Add some thread safety.

This commit is contained in:
joevt 2023-11-23 11:56:36 -08:00 committed by dingusdev
parent 833f74dce6
commit 703662cb5b
2 changed files with 46 additions and 17 deletions

View File

@ -99,12 +99,7 @@ uint32_t TimerManager::add_cyclic_timer(uint64_t interval, timer_cb cb) {
void TimerManager::cancel_timer(uint32_t id) void TimerManager::cancel_timer(uint32_t id)
{ {
TimerInfo* cur_timer = this->timer_queue.top().get(); this->timer_queue.remove_by_id(id);
if (cur_timer->id == id) {
this->timer_queue.pop();
} else {
this->timer_queue.remove_by_id(id);
}
if (!this->cb_active) { if (!this->cb_active) {
this->notify_timer_changes(); this->notify_timer_changes();
} }
@ -112,24 +107,27 @@ void TimerManager::cancel_timer(uint32_t id)
uint64_t TimerManager::process_timers() uint64_t TimerManager::process_timers()
{ {
TimerInfo* cur_timer; std::shared_ptr<TimerInfo> cur_timer;
uint64_t time_now = get_time_now(); uint64_t time_now = get_time_now();
{ // mtx scope
std::lock_guard<std::recursive_mutex> lk(this->timer_queue.get_mtx());
if (this->timer_queue.empty()) { if (this->timer_queue.empty()) {
return 0ULL; return 0ULL;
} }
// scan for expired timers // scan for expired timers
cur_timer = this->timer_queue.top().get(); cur_timer = this->timer_queue.top();
} // ] mtx scope
while (cur_timer->timeout_ns <= time_now) { while (cur_timer->timeout_ns <= time_now) {
timer_cb cb = cur_timer->cb; timer_cb cb = cur_timer->cb;
// re-arm cyclic timers // re-arm cyclic timers
if (cur_timer->interval_ns) { if (cur_timer->interval_ns) {
auto new_timer = this->timer_queue.top(); std::lock_guard<std::recursive_mutex> lk(this->timer_queue.get_mtx());
new_timer->timeout_ns = time_now + cur_timer->interval_ns; cur_timer->timeout_ns = time_now + cur_timer->interval_ns;
this->timer_queue.pop(); this->timer_queue.remove_by_id(cur_timer->id);
this->timer_queue.push(new_timer); this->timer_queue.push(cur_timer);
} else { } else {
// remove one-shot timers from queue // remove one-shot timers from queue
this->timer_queue.pop(); this->timer_queue.pop();
@ -143,11 +141,14 @@ uint64_t TimerManager::process_timers()
this->cb_active = false; this->cb_active = false;
// process next timer // process next timer
{ // [ mtx scope
std::lock_guard<std::recursive_mutex> lk(this->timer_queue.get_mtx());
if (this->timer_queue.empty()) { if (this->timer_queue.empty()) {
return 0ULL; return 0ULL;
} }
cur_timer = this->timer_queue.top().get(); cur_timer = this->timer_queue.top();
} // ] mtx scope
} }
// return time slice in nanoseconds until next timer's expiry // return time slice in nanoseconds until next timer's expiry

View File

@ -28,6 +28,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <vector> #include <vector>
#include <mutex>
using namespace std; using namespace std;
@ -50,16 +51,43 @@ template <typename T, class Container = std::vector<T>, class Compare = std::les
class my_priority_queue : public std::priority_queue<T, Container, Compare> { class my_priority_queue : public std::priority_queue<T, Container, Compare> {
public: public:
bool remove_by_id(const uint32_t id){ bool remove_by_id(const uint32_t id){
std::lock_guard<std::recursive_mutex> lk(mtx);
auto el = this->top();
if (el->id == id) {
std::priority_queue<T, Container, Compare>::pop();
return true;
}
auto it = std::find_if( auto it = std::find_if(
this->c.begin(), this->c.end(), [id](const T& el) { return el->id == id; }); this->c.begin(), this->c.end(), [id](const T& el) { return el->id == id; });
if (it != this->c.end()) { if (it != this->c.end()) {
this->c.erase(it); this->c.erase(it);
std::make_heap(this->c.begin(), this->c.end(), this->comp); std::make_heap(this->c.begin(), this->c.end(), this->comp);
return true; return true;
} else {
return false;
} }
return false;
}; };
void push(T val)
{
std::lock_guard<std::recursive_mutex> lk(mtx);
std::priority_queue<T, Container, Compare>::push(val);
};
T pop()
{
std::lock_guard<std::recursive_mutex> lk(mtx);
T val = std::priority_queue<T, Container, Compare>::top();
std::priority_queue<T, Container, Compare>::pop();
return val;
};
std::recursive_mutex& get_mtx()
{
return mtx;
}
private:
std::recursive_mutex mtx;
}; };
typedef struct TimerInfo { typedef struct TimerInfo {
@ -118,8 +146,8 @@ private:
function<uint64_t()> get_time_now; function<uint64_t()> get_time_now;
function<void()> notify_timer_changes; function<void()> notify_timer_changes;
uint32_t id = 0; std::atomic<uint32_t> id{0};
bool cb_active = false; // true if a timer callback is executing bool cb_active = false; // true if a timer callback is executing // FIXME: Do we need this? It gets written in main thread and read in audio thread.
}; };
#endif // TIMER_MANAGER_H #endif // TIMER_MANAGER_H