blob: fd8a4a3993428e40631da6a8b0546a8e31df53d1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
/*
fifo waitqueue for threads.(Multi-in, single out)
Copyright (C) 2000 Martin Vogt
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Library General Public License as published by
the Free Software Foundation.
For more information look at the file COPYRIGHT in this package
*/
#include "threadQueue.h"
#define _MAX_THREAD_IN_TQUEUE 5
#include <iostream>
using namespace std;
//
// WaitThreadEntry class [START]
//
WaitThreadEntry::WaitThreadEntry() {
abs_thread_cond_init(&waitCond);
}
WaitThreadEntry::~WaitThreadEntry() {
abs_thread_cond_destroy(&waitCond);
}
//
// WaitThreadEntry class [END]
//
ThreadQueue::ThreadQueue() {
waitThreadEntries=new WaitThreadEntry* [_MAX_THREAD_IN_TQUEUE];
int i;
for(i=0;i<_MAX_THREAD_IN_TQUEUE;i++) {
waitThreadEntries[i]=new WaitThreadEntry();
}
abs_thread_mutex_init(&queueMut);
insertPos=0;
removePos=0;
size=0;
}
ThreadQueue::~ThreadQueue() {
abs_thread_mutex_lock(&queueMut);
if (size != 0) {
cout << "Aieee! Make sure that all threads are out of ThreadQueue"<<endl;
exit(0);
}
int i;
for(i=0;i<_MAX_THREAD_IN_TQUEUE;i++) {
delete waitThreadEntries[i];
}
delete [] waitThreadEntries;
abs_thread_mutex_unlock(&queueMut);
abs_thread_mutex_destroy(&queueMut);
}
void ThreadQueue::waitForExclusiveAccess() {
abs_thread_mutex_lock(&queueMut);
if (size == 0) {
abs_thread_mutex_unlock(&queueMut);
return;
}
// wait
size++;
if (size == _MAX_THREAD_IN_TQUEUE) {
cout << "Aieee! ThreadQueue can only buffer:"<<_MAX_THREAD_IN_TQUEUE<<endl;
exit(0);
}
abs_thread_cond_t* waitCond=&(waitThreadEntries[insertPos]->waitCond);
insertPos++;
// wrap counter
if (insertPos == _MAX_THREAD_IN_TQUEUE) {
insertPos=0;
}
abs_thread_cond_wait(waitCond,&queueMut);
abs_thread_mutex_unlock(&queueMut);
}
void ThreadQueue::releaseExclusiveAccess() {
abs_thread_mutex_lock(&queueMut);
if (size == 0) {
abs_thread_mutex_unlock(&queueMut);
return;
}
// wake up next thread
abs_thread_cond_t* waitCond=&(waitThreadEntries[removePos]->waitCond);
removePos++;
// wrap counter
if (removePos == _MAX_THREAD_IN_TQUEUE) {
removePos=0;
}
size--;
abs_thread_cond_signal(waitCond);
abs_thread_mutex_unlock(&queueMut);
}
|