博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列
阅读量:6264 次
发布时间:2019-06-22

本文共 5064 字,大约阅读时间需要 16 分钟。

一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。

/* * BufQueue.h * *  Created on: May 30, 2013 *      Author: archy_yu */#ifndef BUFQUEUE_H_#define BUFQUEUE_H_#include 
#include
//#include "CommonStruct.h"typedef void* CommonItem;#define BATPROCESS_NUM 5class BufQueue{public: BufQueue(); virtual ~BufQueue(); int peek(CommonItem &item); int append(CommonItem item); std::list
* serial_read(std::list
* inlist); int set_read_event(); void WaitReadEventByTimeOut(bool& isReadable);private: std::list
* _write_list; std::list
* _read_list; pthread_mutex_t _write_mutex; pthread_mutex_t _read_mutex; pthread_cond_t _read_cond;};#endif /* BUFQUEUE_H_ */
/* * BufQueue.cpp * *  Created on: May 30, 2013 *      Author: archy_yu */#include "BufQueue.h"#include 
#include
void maketimeout(struct timespec* tsp,long ns = 1){ struct timeval now; //get the current time gettimeofday(&now,0); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; //usec to nsec tsp->tv_nsec += 1000*ns;}CommonItem PopMsgToPutFromList( std::list
* pList ){ if(pList == NULL) { return NULL; } if(pList->empty()) { return NULL; } else { CommonItem item = pList->front(); pList->pop_front(); return item; }}BufQueue::BufQueue(){ pthread_mutex_init(&this->_write_mutex,NULL); pthread_mutex_init(&this->_read_mutex,NULL); pthread_cond_init(&this->_read_cond,NULL); this->_read_list = new std::list
(); this->_write_list = new std::list
();}BufQueue::~BufQueue(){ pthread_mutex_destroy(&this->_write_mutex); pthread_mutex_destroy(&this->_read_mutex); pthread_cond_destroy(&this->_read_cond); this->_read_list->clear(); this->_write_list->clear();}int BufQueue::peek(CommonItem &item){ pthread_mutex_lock(&this->_write_mutex); item = PopMsgToPutFromList(this->_read_list); pthread_mutex_unlock(&this->_write_mutex); return 0;}int BufQueue::append(CommonItem item){ if(item == NULL) { return 0; } bool issetread = false; pthread_mutex_lock(&this->_write_mutex); this->_write_list->push_back(item); issetread = (this->_write_list->size() > BATPROCESS_NUM); pthread_mutex_unlock(&this->_write_mutex); if(issetread) { this->set_read_event(); } return 0;}std::list
* BufQueue::serial_read(std::list
* inQueue){ pthread_mutex_lock(&this->_write_mutex); std::list
* tmplist = this->_write_list; this->_write_list = this->_read_list; this->_read_list = tmplist; tmplist = this->_read_list; this->_read_list = inQueue; pthread_mutex_unlock(&this->_write_mutex); return tmplist;}int BufQueue::set_read_event(){ pthread_mutex_lock(&this->_read_mutex); pthread_cond_signal(&this->_read_cond); pthread_mutex_unlock(&this->_read_mutex); return 0;}void BufQueue::WaitReadEventByTimeOut(bool& isReadable){ pthread_mutex_lock(&this->_read_mutex); struct timespec ts; maketimeout(&ts,0); isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts)); pthread_mutex_unlock(&this->_read_mutex);}
 给出测试代码和用法

BufQueue _queue;void* process(void* arg){    int i=0;    while(true)    {        int *j = new int();        *j = i;        _queue.append((void *)j);        i ++;    }    return NULL;}int main(int argc,char* argv[]){    pthread_t pid;    pthread_create(&pid,0,process,0);    long long int start = TimeKit::get_tick();    while(true)    {        list
* queue_to_read = new list
(); bool read = false; _queue.wait_readevent_by_timeout(read); if(read) { queue_to_read = _queue.serial_read(queue_to_read); list
::iterator iter; for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++) { int* j = (int*)(*iter); if(100000 <= (*j)) { long long int end = TimeKit::get_tick(); printf("%ld",(end - start)); return 0; } printf("%d\n",(*j)); } } } /* _recv_net_msg_queue = new DuplexList(); _send_net_msg_queue = new DuplexList(); InputMonitor _recv_thread(_recv_net_msg_queue); OutPutMonitor _send_thread(_send_net_msg_queue); _recv_thread.open("192.168.9.221",6000); _recv_thread.start(); _send_thread.start(); int count = 0; while(true) { MessageBlock* mb = NULL; if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0) { if(count % 1000 == 0) {// printf("process %d msg\n",count); } mb->msg_type(NORMAL_MSG_TYPE); _send_net_msg_queue->append(mb); count ++; } else { usleep(2); } }*/ return 0;}
有兴趣的可以测试下,有什么问题可以联系我!

转载地址:http://cbupa.baihongyu.com/

你可能感兴趣的文章
python中获取指定目录下所有文件名列表的程序
查看>>
HTML5的本地存储 LocalStorage
查看>>
safari和ie的时间解析(显示为NAN)
查看>>
基于 HTML5 WebGL 的挖掘机 3D 可视化应用
查看>>
Java工具创建密钥库,用于Unity 3D打包、签名、发布
查看>>
Oracle用户解锁
查看>>
MongoDB的使用
查看>>
C#开启异步 线程的四种方式
查看>>
XML解析
查看>>
2784: 【提高】小 X 与煎饼达人(flip)
查看>>
Linux 常用的压缩命令有 gzip 和 zip
查看>>
内存分段与分页
查看>>
第一个WindowService服务
查看>>
zookeeper的三种安装模式
查看>>
腾讯2014实习面经 —— 面试经过回忆
查看>>
MIT Scheme 使用 Edwin
查看>>
BZOJ1500:[NOI2005]维修数列——题解
查看>>
transactionscope报“此操作对该事务的状态无效”问题
查看>>
css3(border-radius)边框圆角详解(转)
查看>>
[摘录]第2章 中场谈判技巧
查看>>