生产者消费者模型----基于互斥与同步

在这里插入图片描述
在这里插入图片描述
我们的生产者和消费者,传输数据只是第一步

  1. 数据怎么来的,耗时吗
  2. 数据怎么处理,耗时吗

我们还需要对获得的数据添加一个场景 即对他就行任务处理

如生产者放进去了一个任务(x,y±* /)
消费者拿到了这个任务,就要对这些任务进行处理

BlockQueue.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//.hpp 是 c++里面 在开源软件里面使用,声明和定义都可以放在一个文件里面

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include<unistd.h>
namespace ns_blockqueue//使用一个自己的命名空间
{
const int default_cap = 5;
template <class T>//定义一个类模板

class BlockQueue
{
private:
std::queue<T> _bq; //队列,使用了一个类模板
int _cap; //队列的元素上限
pthread_mutex_t _mutex; //保证数据安全
//当生产满了的时候,就不要生产了,不要生产锁了,让消费者来消费
//当消费空了,就不要消费了,让生产了
//这就要有两个条件变量
pthread_cond_t _full; //_bq满的,就要让消费者来消费,空了就要在这个条件变量等待
pthread_cond_t _empty; //_bq空了,要让生产者来生产,生产者在该条件变量进行等待

private:
bool IsFull()
{
return _bq.size() == _cap;
}

void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void WaitProduct()
{
//因为条件变量和互斥锁都搭配使用,我们进入临界资源,我们就是持有锁的
// 1.调用的时候,会首先自动释放锁资源
// 2.然后再挂起自己,这样别人就可以去申请锁
// 3.返回的时候,会首先竞争锁,获取到锁之后,才能返回

pthread_cond_wait(&_full, &_mutex);
}

bool IsEmpty()
{
return _bq.empty();
}

void WaitConsume()
{
//如果一直抱着锁被挂起的话,就会被永远挂起,死锁

pthread_cond_wait(&_empty, &_mutex);
}
void WakeupConsumer()
{
pthread_cond_signal(&_empty);
}
void WakeupProducer()
{
pthread_cond_signal(&_full);
}

public:
BlockQueue(const int cap = default_cap) //带一个缺省参数
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
// const & 输入型参数
//* 输出型参数
//& 输入输出型参数

//只有消费者知道,生产者什么时候可以生产
void Push(const T &in) //向我们的队列中放数据
{
//在访问临界资源的时候,就应该把数据锁住
LockQueue();
//因为生产者消费者之间都是看同一个队列,所以这一把锁就已经够用了



//临界区
// if (IsFull())
//我们需要进行条件检测的时候,这里需要使用循环的方式
//来保证退出循环一定是条件不满足导致的
while (IsFull())
{
//等待,把线程挂起,我们当前是持有锁的,
//如果队列是空的话就不应该生产了,而是在那里等待

//1. 如果我挂起失败呢,因为函数调用有成功有失败
//函数调用失败


//2. 如果我被伪唤醒呢(条件还没有就绪)
//如果是多核多CPU的,很多线程都在条件变量下等待



WaitProduct();
//我醒来之后要再进行一次判断,判断是否为满,判断成功就往下走,
}
//用if判断,有可能当前队列还是满的,再向下走的话,就会插入一个不应该插入的数据

//生产函数
_bq.push(in);

//唤醒消费者,这里我们制定的策略是,有一个就唤醒一个
if(_bq.size()>_cap/2)//改变策略
WakeupConsumer();

UnLockQueue(); //解锁
}
//只有生产者知道,消费者什么时候可以生产
void Pop(T *out) //向队列里面拿数据
{
LockQueue();
// if(IsEmpty())
while(IsEmpty())
{
//无法消费
WaitConsume();
}
//消费函数
*out = _bq.front(); //拿到队头的元素
_bq.pop(); //删除数据
if(_bq.size()<_cap/2)
WakeupProducer();//唤醒生产者,这里的wakeup放在里面和外面都是可以的
UnLockQueue();
}
};
}

Task.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#pragma once
#include <iostream>
#include <pthread.h>

namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; //表示+-*/%
public:
Task() //无参构造,为了拿任务,不需要参数列表
{
}
//进行函数重载
Task(int x, int y, char op) //有参构造,制造任务
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int Run()
{
int res = 0;
switch (_op)
{
case '+':
res = _x + _y;
break;
case '-':
res = _x - _y;
break;
case '*':
res = _x * _y;
break;
case '/':
res = _x / _y;
break;
case '%':
res = _x % _y;
break;
default:
std::cout << "bug?" << std::endl;
break;
}
std::cout << "当前任务正在被:" << pthread_self() << "处理:" << _x << _op << _y << "=" << res << std::endl;
return res;
}
Task operator=(Task &s)
{
if (this != &s)
{
_x = s._x;
_y = s._y;
_op = s._op;
}
return *this;
}
int operator()()//重载一个函数
{
return Run();
}
};
}

PC.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include"BlockQueue.hpp"
#include"Task.hpp"

using namespace ns_blockqueue;
#include<cstdlib>
#include<ctime>

using namespace ns_task;

void* consumer(void* args)
{
//两个角色已经是有了
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;//拿到了了阻塞队列

while(true)
{
// sleep(2);
// int data=0;//我们想把数据拿出来
// bq->Pop(&data);
// //data就是一个输出型参数
// std::cout<<"消费者消费了一个数据:"<<data<<std::endl;

//消费者这里需要获得任务,无参构造就行了
Task t;
bq->Pop(&t);//任务拿出来了,消费的第一步
//拿出来之后我们就要对任务进行处理
// t.Run();
t();//拿到任务,直接返回,消费的第二步

}
}

void* producer(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;//拿到了了阻塞队列
std::string ops="+-*/%";

while(true)
{
// sleep(2);//按照生产者的节奏来走
//我生产一条他消费一条
//制造数据

//生产者的数据(task)哪里来呢
// int data=rand()%20+1;
// std::cout<<"生产者生产数据:"<<data<<std::endl;
// bq->Push(data);

//1. 生产一个任务
sleep(1);
int x=rand()%20+1;
int y=rand()%10+1;
char op=ops[rand()%5];//0-4之间,进行操作运算
Task t(x,y,op);
std::cout<<"生产派发了一个任务"<<x<<op<<y<<"="<<"?"<<std::endl;

//2.把数据放到队列里面

bq->Push(t);//把任务塞进去了
}
}

int main()
{
srand((long long)time(nullptr));//种了一个随机数种子
BlockQueue<Task>* bq=new BlockQueue<Task>(6);//我们动态开辟的一个空间,因为引入了模板,所以这里我们对他进行实例化一下顺便初始化一下
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c1,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c2,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c3,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c4,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&p,nullptr,producer,(void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}

生产者消费者模型----基于互斥与同步
http://example.com/2022/06/03/生产者消费者模型----基于互斥与同步/
作者
Zevin
发布于
2022年6月3日
许可协议