博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors
阅读量:6527 次
发布时间:2019-06-24

本文共 5569 字,大约阅读时间需要 18 分钟。

背景

通过做以下一个小的接口系统gate,了解一下minajava并发包里的东西。A系统为javaweb项目,BC语言项目,gate是本篇须要完毕的系统。

需求

1. A为集群系统,并发较高,会批量发送给gate消息,而且接受gate返回的消息;

2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B,须要保证性能;

3. B负责处理消息,并返回处理结果,Bgate提供提供六个port,一个port可有三个长连接(须由gate发送心跳保持长连接,否则超时切断连接)。

实例

项目中用到了两个框架mina2.0.7axis2。首先,gate须要接收从A发送过来的消息,为保证消息顺序性,压入队列中,为保证性能,将队列中的消息通过不同的连接发送至B,这让我们非常快就想到了多线程中生产者消费者的那张图,而且这是一个生产者,多个消费者,以下我们来看代码。

首先,gate作为服务端,要为A提供一个接口,使用axis2完毕了,关于webservice就不必多说,可看我前面的博客,配置例如以下:

SendService
cn.net.easyway.customer.SendService

以下是服务实现类:

import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import cn.net.easyway.nds.MsgConsumer;import cn.net.easyway.nds.MsgProducer;/** * 为用户管理系统提供服务接口 * @author yuanfubiao * */public class SendService {		private static Log logger = LogFactory.getLog(SendService.class);		private static int num = 0;	//消息队列	private static LinkedBlockingQueue
msgQueue = new LinkedBlockingQueue
(); //生产者线程池 private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程,应对并发较高的情况 //消费者线程池 private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18); //和连接数相应 /** * 放入消息 * @param list 消息列表 */ public void putMsg(List
list){ //将消息放入队列 executorProducer.execute(new MsgProducer(msgQueue,list)); //取出消息:数据量大,启用全部线程 if(list.size() > 18){ for(int i=0;i<18;i++){ executorCustomer.execute(new MsgConsumer(msgQueue)); } }else{ executorCustomer.execute(new MsgConsumer(msgQueue)); } }}

Java并发包为我们提供了非常多有用的多线程东西,因此没有必要自己去实现一个队列和线程池,如上面代码我们用到的队列是LinkedBlockingQueue,他为线程安全的堵塞队列,多线程操作时不必为了同步而担心,而且会将进出两边自己主动负载,他实现自接口。

jdk中能够看到实现BlockingQueue接口的还有ArrayBlockingQueue,DelayQueue, LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue;此接口就是提供一个堵塞队列,从api中我们看到例如以下一张图:

Throwsexception:当队列已满,再次加入会抛出错误,取数据也是如此;

Specialvalue:加入或取出时会有一个返回值;

Blocks:是在队列已满或为空时,会一直堵塞;

Time Out:指堵塞到一定时间,线程退出;

当中,另一个并发队列也是作为生产者消费者的首选:ConcurrentLinkedQueue,它是非堵塞队列,肯定就不是出自Blockingqueue接口,而是出自AbstractQueue,因此也就没有put和take方法,使用这个并发队列须要有两点注意:第一,推断是否为空尽量使用isEmpty方法,不要用size(),有人測试过size方法非常耗费时间;第二就是线程问题,尽管ConcurrentLinkedQueue是线程安全的,可是仅仅负责原子性的,就是说当你操作queue.add() or queue.poll的时候是安全的,当并发量较大时,你在使用queue.isEmpty时还不为空,但就在这空当有可能就运行poll操作,导致队列为空引起异常,可用例如以下代码:

synchronized(queue) {    if(!queue.isEmpty()) {       queue.poll();    }}

gate中,我定义了两个线程池,一个是生产者,还有一个是消费者:

//生产者线程池	private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程,应对并发较高的情况	//消费者线程池	private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18);

Executors提供了一个工厂方法,用来创建线程池,返回的线程池都实现了ExecutorService接口,能够创建例如以下线程池:

newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用曾经构造的线程,假设如今线程没有可用的,则创建一个新线程加入到池中,终止并从缓存中溢出那些已有60秒未被使用的线程;

newFixedThreadPool(intnThreads):创建固定的线程;

newScheduledThreadPool(intcorePoolSize):创建一个支持定时及周期性的任务运行的线程池;

newSingleThreadExecutor():创建一个单线程的Executor

启动线程,有两个方法,一个是execute(),还有一个是submit(),后者是有返回值的,会将运行的结果Future返回,关于Future可移步

以下就是生产者和消费者代码:

生产者:

import java.util.Iterator;import java.util.List;import java.util.concurrent.LinkedBlockingQueue;/** * 向队列加入消息 * @author yuanfubiao * */public class MsgProducer implements Runnable {	private LinkedBlockingQueue
msgQueue; private List
message; public MsgProducer(LinkedBlockingQueue
queue,List
msg) { this.msgQueue = queue; this.message = msg; } @Override public void run() { Iterator
iter = message.iterator(); while(iter.hasNext()){ String msg = iter.next(); try { msgQueue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }}

消费者:

import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.LinkedBlockingQueue;import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.session.IoSession;/** * 从消息队列取出消息 * @author yuanfubiao * */public class MsgConsumer implements Runnable{	private static Log logger = LogFactory.getLog(MsgConsumer.class);	private LinkedBlockingQueue
msgQueue; public MsgConsumer(LinkedBlockingQueue
queue) { this.msgQueue = queue; } @Override public void run() { while(!msgQueue.isEmpty()){ String msg = null; try { msg = msgQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } if(null == msg){ return; } //增加时间 SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); String now = format.format(new Date()); String prefix = msg.substring(0, 19); String suffix = msg.substring(33, msg.length()); String packet = prefix.trim() + now.trim() + suffix.trim(); //签名部分忽略 //TODO String newStr = packet // 签名 + signature.toUpperCase().trim(); //关于mina,可见我下篇文章 IoSession session = SessionPool.getSession(newStr.substring(13, 15)); logger.info("发送数据:" + newStr); session.write(newStr); try { Thread.sleep(1000); //等待一秒 } catch (InterruptedException e1) { e1.printStackTrace(); } } }}
源代码下载:

你可能感兴趣的文章
IT人的自我导向型学习:开篇杂谈
查看>>
[原创]BizTalk动手实验系列目录
查看>>
HDU 4611Balls Rearrangement(思维)
查看>>
[LeetCode] Majority Element II
查看>>
minGW, cygwin, GnuWin32【C++的跨平台交叉编译问题】
查看>>
我的Dll(动态链接库)学习笔记(转)
查看>>
应用程序域
查看>>
有向图的拓扑排序算法JAVA实现
查看>>
HTML页面跳转的5种方法
查看>>
ArcGIS Engine开发之旅02--ArcGIS Engine中的类库
查看>>
李洪强-C语言5-函数
查看>>
开源监控利器grafana
查看>>
Android获取当前时间与星期几
查看>>
jenkins2 multibranch
查看>>
Css定位-定位
查看>>
sort,uniq命令
查看>>
am335x 电容屏驱动添加。
查看>>
JavaScript Unicode字符操作
查看>>
rhel-server-7.2-x86_64无法联网(VMware环境)
查看>>
Nginx配置中的log_format用法梳理(设置详细的日志格式)
查看>>