client code:
/**
*
* 客户端的ChannelHandler集合,由子类实现,这样做的好处:
* 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
* 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
* 地获取所有的handlers
*/
public interface ChannelHandlerHolder {
ChannelHandler[] handlers();
}
/**
*
* 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
*/
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
private final Bootstrap bootstrap;
private final Timer timer;
private final int port;
private final String host;
private volatile boolean reconnect = true;
private int attempts;
public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
this.bootstrap = bootstrap;
this.timer = timer;
this.port = port;
this.host = host;
this.reconnect = reconnect;
}
/**
* channel链路每次active的时候,将其连接的次数重新☞ 0
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("当前链路已经激活了,重连尝试次数重新置为0");
attempts = 0;
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(reconnect){
logger.info("链接关闭,将进行重连");
if (attempts < Constants.RET_CONNECT_TIME) {
attempts++;
//重连的间隔时间会越来越长
int timeout = 1 << attempts;
if(timeout>600){
timeout=600;
}
//timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
//int timeout=Constants.RET_CONNECT_TIME_OUT;
timer.newTimeout(this, timeout,TimeUnit.SECONDS);
}
}
else
{
logger.info("链接关闭");
}
ctx.fireChannelInactive();
}
public void run(Timeout timeout) throws Exception {
ChannelFuture future;
//bootstrap已经初始化好了,只需要将handler填入就可以了
synchronized (bootstrap) {
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(handlers());
}
});
future = bootstrap.connect(host,port);
}
//future对象
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();
//如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
Channel channel=f.channel();
if (!succeed) {
logger.info("重连失败");
channel.pipeline().fireChannelInactive();
}else{
logger.info("重连成功");
//channel.pipeline().fireChannelActive();
//SocketChannel socketChannel=(SocketChannel) channel;
//socketChannel.writeAndFlush(new Message(this.clientId,"CLIENT_START"));
}
}
});
}
public class NettyClient implements InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private int port;
private String host;
private String clientId;
private SocketChannel socketChannel;
protected final HashedWheelTimer timer = new HashedWheelTimer();
@Override
public void afterPropertiesSet() throws Exception {
this.start();
}
public void start() {
logger.info("客户端正在启动---------------");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
//bootstrap.remoteAddress(host,port);
//bootstrap.handler(new NettyClientInitializer(this.clientId));
/*
pipeline.addLast(new IdleStateHandler(
Constants.READ_IDEL_TIME_OUT,
Constants.WRITE_IDEL_TIME_OUT,
Constants.ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(nettyClientHandler);
*/
final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, timer, port,host, true) {
public ChannelHandler[] handlers() {
return new ChannelHandler[] {
this,
new IdleStateHandler(
Constants.CLENT_READ_IDEL_TIME_OUT,
Constants.CLENT_WRITE_IDEL_TIME_OUT,
Constants.CLENT_ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS),
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new NettyClientHandler(clientId)
};
}
};
ChannelFuture future=null;
//进行连接
try {
synchronized (bootstrap) {
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<Channel>() {
//初始化channel
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog.handlers());
}
});
future = bootstrap.connect(host,port);
}
// 以下代码在synchronized同步块外面是安全的
future.sync();
if(future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
//socketChannelMap.put(clientId, socketChannel);
//System.out.println("client start,clientId:"+clientId);
//socketChannel.writeAndFlush("Hello Netty Server ,I am a common client");
logger.info("客户端完成启动---------------");
}
} catch (Throwable t) {
logger.info("客户端连接失败---------------");
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
eventLoopGroup.shutdownGracefully();
}
}
public void sendMessage(String topic,String content){
this.sendMessage(topic,content.getBytes());
}
public void sendMessage(String topic,byte[] data){
Message message=new Message(this.clientId,topic);
message.setData(data);
this.sendMessage(message);
}
public void sendMessage(Message message) {
if (this.socketChannel!=null&&this.socketChannel.isOpen()) {
if(StringUtils.isEmpty(message.getClientId())){
message.setClientId(this.clientId);
}
socketChannel.writeAndFlush(message);
/*
ChannelFuture future=this.socketChannel.writeAndFlush(message);
future.addListener(new ChannelFutureListener(){
public void operationComplete(final ChannelFuture future)throws Exception{
//printTime("connect结束: ");
}
});
*/
}
else{
logger.error("客户端未连接服务器,发送消息失败!");
}
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
/*
1)客户端连接服务端
2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s
3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)
4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道
5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路
6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连
*/
@ChannelHandler.Sharable
public class NettyClientHandler extends DefaultClientHandler<Message> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private int pingTime=0;//心跳次数
Message pingMessage=null;//心跳数据
String clientId="";
public NettyClientHandler(String clientId) {
this.clientId=clientId;
pingMessage=new Message(clientId,"Ping");
}
//利用写空闲发送心跳检测消息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
//发送心跳到服务器
//一分钟发送一次心跳
pingTime++;
//普通心跳不传数据
pingMessage.setData("");
PingMessageHelper.setMessageData(pingTime,pingMessage);
//一天重置一次
if(pingTime==1024){
pingTime=0;
}
ctx.writeAndFlush(pingMessage);
logger.info("客户端发送心跳----------");
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Constants.CLIENT_NETWORK_STATUS=1;
logger.info("-------客户端上线----------");
//发送登录消息
SocketChannel socketChannel=(SocketChannel) ctx.channel();
socketChannel.writeAndFlush(new Message(this.clientId,"CLIENT_START"));
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//System.out.println("停止时间是:"+new Date());
//System.out.println("HeartBeatClientHandler channelInactive");
Constants.CLIENT_NETWORK_STATUS=0;
logger.info("-------客户端下线----------");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
//客户端通道
SocketChannel socketChannel=(SocketChannel) ctx.channel();
this.producerRun(socketChannel,msg.getClientId(),msg.getTopic(),msg.getData());
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//Channel incoming = ctx.channel();
//logger.info("SimpleChatClient:"+incoming.remoteAddress()+"异常");
//cause.printStackTrace();
ctx.close();
}
public class NettyClientInitializer extends ChannelInitializer<SocketChannel>{
private NettyClientHandler nettyClientHandler;
NettyClientInitializer(String clientId){
nettyClientHandler = new NettyClientHandler(clientId);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline= socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(
Constants.CLENT_READ_IDEL_TIME_OUT,
Constants.CLENT_WRITE_IDEL_TIME_OUT,
Constants.CLENT_ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(nettyClientHandler);
}
}
listener code
/*
* 通知客户端升级APP
*/
public class AppUpdateMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(AppUpdateMessageListener.class);
@Override
public void onReceiveMessages(byte[] data) {
String json=this.data2String(data);
if(StringUtils.isEmpty(json)){
return;
}
JSONObject jsonObj=JSONObject.fromObject(json);
//升级时间
Long time=jsonObj.getLong("time");
//升级设备组号
String deviceNumber=jsonObj.getString("deviceNumber");
}
/*
* 检查网络状态
*/
public class CheckNetworkMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(CheckNetworkMessageListener.class);
IDeviceService deviceService;
@Override
public void onReceiveMessages(byte[] data) {
String strData=this.data2String(data);
if(strData.equals("s2c")){
logger.info("客户端["+this.getClientId()+"]接收到服务器端网络检查消息");
//服务器发到客户端的消息
//客户端回复给服务器
Message t = new Message(this.getClientId(),"CheckNetwork");
t.setData("c2s");
this.sendMessage(t);
}
else if(strData.equals("c2s")){
//客户端发到服务器的消息
//表示客户端与服务器连接成功
if(deviceService==null){
deviceService = ContextUtil.getBeanByName(IDeviceService.class, "deviceService");
}
String facDevNo=Constants.getFactoryDevNo(this.getClientId());
logger.info("服务器端接收到客户端["+facDevNo+"]回复的网络检查消息");
Device device=deviceService.getByFacDevNo(facDevNo);
//Device device=deviceService.findDeviceByDevNo(devNo);
//deviceService.updateDeviceNetWorkStatus(facDevNo,DeviceNetworkLogEnum.OnLine.getStatus());
if(device.getNetWorkState()==null||device.getNetWorkState().intValue()==DeviceNetworkLogEnum.OffLine.getStatus()){
int count=deviceService.updateDeviceNetworkStatus(device,DeviceNetworkLogEnum.OnLine.getStatus());
if(count>0){
logger.info("设备["+facDevNo+"]状态已更新为在线状态");
}
}
}
}
/*
* 启动的监听器
*/
public class client_startMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(client_startMessageListener.class);
@Override
public void onReceiveMessages(byte[] data) {
logger.info("{}客户端启动连接成功......",this.getClientId());
}
}
public class ClientTestMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(ClientTestMessageListener.class);
@Override
public void onReceiveMessages(byte[] data) {
logger.info("来自客户端的消息:"+this.data2String(data));
}
}
public abstract class DefaultMessageListener implements IMessageListener {
//private static final Logger logger = LoggerFactory.getLogger(DefaultMessageListener.class);
private SocketChannel executor;
private String clientId;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public void setExecutor(SocketChannel executor) {
this.executor = executor;
}
@Override
public void recieveMessage(byte[] data){
this.onReceiveMessages(data);
}
public abstract void onReceiveMessages(byte[] data);
@Override
public SocketChannel getExecutor() {
return this.executor;
}
public String data2String(byte[] data) {
String s="";
try {
s=new String(data,Constants.CHAR_SET);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return s;
}
public void sendMessage(Message message) {
if (this.executor!=null&&this.executor.isOpen()) {
//logger.info("服务器向客户端回复");
this.executor.writeAndFlush(message);
}
else{
//logger.error("客户端未连接服务器,发送消息失败");
}
}
public static Object newInsatnce(String name) {
Object result = null;
try {
String clsName="com.vendor.netty.listener."+name+"MessageListener";
//Class c = Class.forName(clsName);
Class cls=Class.forName(clsName, true, DefaultMessageListener.class.getClassLoader());
result=cls.newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return result;
}
/*
* 服务器端监听客户段网络状态
*/
public class NetworkMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(NetworkMessageListener.class);
private static DeviceWarnService deviceWarnService;
@Override
public void onReceiveMessages(byte[] data) {
String factoryDevNo=Constants.getFactoryDevNo(this.getClientId());
if(StringUtils.isEmpty(factoryDevNo)){
return;
}
int status=Integer.parseInt(data2String(data));
logger.info("客户端[{}]链接{}",factoryDevNo,status==1?"打开":"断开");
if(deviceWarnService==null){
deviceWarnService = ContextUtil.getBeanByName(DeviceWarnService.class, "deviceWarnService");
}
deviceWarnService.updateDeviceNetworkStatus(factoryDevNo,status);
/*
String offlineDeviceKey="OFFLINE_DEVICE_"+factoryDevNo;
int timeOut=1;
//定时任务处理
NettyDeviceStatusTimerTask nettyTimerTask=new NettyDeviceStatusTimerTask(deviceWarnService);
nettyTimerTask.setCacheKey(offlineDeviceKey);
nettyTimerTask.setFactoryDevNo(factoryDevNo);
nettyTimerTask.setStatus(status);
//如果是离线状态,延时一分钟更新
if(status==0){
timeOut=70;
RedisKit.setCache(offlineDeviceKey,status,timeOut-10);
}
else if(status==1){
timeOut=1;
//获取上次设备离线状态
Integer cacheStatus=RedisKit.getCache(offlineDeviceKey);
if(cacheStatus!=null){
//在一分钟内设备又上线了
RedisKit.deleteCache(offlineDeviceKey);
}
//设备状态更新
//this.updateDeviceStatus(facDevNo,status);
}
else{
timeOut=1;
}
TimeOutTaskCreator.createTask(nettyTimerTask,timeOut,NettyMessageTimerTask.UNIT);
*/
}
public class PingMessageHelper {
public static void setMessageData(int pingTime, Message pingMessage) {
//时间由大到小
String data="";
if(pingTime%10==0){
//10分钟业务处理
data=pingTime+"分钟业务处理";
}
//5分钟业务处理
else if(pingTime%5==0){
data=pingTime+"分钟业务处理";
}
else{
//常规心跳
//一分钟一次
data="";
}
pingMessage.setData(StringUtils.isEmpty(data)?null:data.getBytes());
}
}
public class PingMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(PingMessageListener.class);
@Override
public void onReceiveMessages(byte[] data) {
if(data==null){
//心跳,检查客户端是否存活
/*
logger.info("服务器端接受到Ping:"+this.getClientId());
Message apply=new Message(this.getClientId(), "ToClient");
apply.setData("Ping:服务器端回复客户端".getBytes());
this.sendMessage(apply);
*/
}
else{
//业务数据处理
//logger.info(new String(data));
//fingerjk_7101444444
//fingerjk_93006033
//fingerjk_93006032
//fingerjk_93006041
}
}
}
/*
* 通知客户端上传视频文件
*/
public class UploadVideoMessageListener extends DefaultMessageListener{
private static final Logger logger = LoggerFactory.getLogger(UploadVideoMessageListener.class);
@Override
public void onReceiveMessages(byte[] data) {
String orderNo=this.data2String(data);
}
}
server code
@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
private int loss_connect_time = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state=event.state();
if (state==IdleState.READER_IDLE) {
loss_connect_time++;
logger.info(String.valueOf(Constants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"秒没有接收到客户端的信息了");
if(loss_connect_time>=Constants.MAX_LOSS_CONNECT_TIME){
logger.info("------------服务器主动关闭客户端链路");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx,evt);
}
}
}
public class NettyChannelMap {
public static volatile CommonMap<SocketChannel> map=null;
public static CommonMap<SocketChannel> getChannelMap() {
if (map == null) {
synchronized (CommonMap.class) {
map=new CommonMap<SocketChannel>();
}
}
return map;
}
public synchronized static void add(String clientId,SocketChannel socketChannel){
getChannelMap().put(clientId, socketChannel);
}
public synchronized static SocketChannel get(String clientId){
return getChannelMap().get(clientId);
}
public synchronized static String get(SocketChannel socketChannel){
return getChannelMap().get(socketChannel);
}
public synchronized static SocketChannel remove(String clientId){
return getChannelMap().remove(clientId);
}
public synchronized static String remove(SocketChannel socketChannel){
return getChannelMap().remove(socketChannel);
}
public synchronized static boolean contains(String clientId) {
return getChannelMap().contains(clientId);
}
}
@Component("nettyConfigue")
public class NettyConfigue implements ApplicationListener<ContextRefreshedEvent>{
private static final Logger logger = LoggerFactory.getLogger(NettyConfigue.class);
@Autowired NettyServer nettyServer;
@Value("${netty.port}")
private Integer netty_port;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
logger.info("NettyConfigue 端口配置:{}",netty_port);
if(netty_port==null){
return;
}
if(netty_port!=null){
/*
new Thread(new Runnable() {
@Override
public void run() {
if(!nettyServer.isSuccess()){
nettyServer.setPort(netty_port);
nettyServer.start();
}
}
}).start();
*/
logger.info("NettyConfigue 延迟一分钟启动");
long delay=60000;
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
//启动socket监听
if(!nettyServer.isSuccess()){
nettyServer.setPort(netty_port);
nettyServer.start();
}
}
}, delay);
}
else{
logger.error("NettyConfigue端口配置错误:{}",netty_port);
}
}
}
/*
* 更新设备状态
*/
public class NettyDeviceStatusTimerTask implements TimerTask{
private static final Logger logger = LoggerFactory.getLogger(NettyDeviceStatusTimerTask.class);
DeviceWarnService deviceWarnService=null;
String cacheKey="";
String factoryDevNo="";
int status=0;
public void setCacheKey(String cacheKey) {
this.cacheKey = cacheKey;
}
public void setFactoryDevNo(String factoryDevNo) {
this.factoryDevNo = factoryDevNo;
}
public void setStatus(Integer status) {
this.status = status;
}
public NettyDeviceStatusTimerTask(DeviceWarnService deviceWarnService) {
this.deviceWarnService=deviceWarnService;
}
@Override
public void run(Timeout timeout) throws Exception {
synchronized(this){
logger.info("客户端[factoryDevNo:{},status:{}]",this.factoryDevNo,this.status);
if(this.status==0){
//离线
//一分钟后执行的
//判断缓存中是否存在
Integer cacheStatus=RedisKit.getCache(this.cacheKey);
logger.info("cacheStatus:{}",cacheStatus);
if(cacheStatus!=null&&cacheStatus==0){
//离线超过一分钟
this.updateDeviceStatus(this.factoryDevNo,this.status);
}
}
else{
//上线
//立马上线
this.updateDeviceStatus(this.factoryDevNo,this.status);
}
}
}
void updateDeviceStatus(String facDevNo, int status) {
deviceWarnService.updateDeviceNetworkStatus(facDevNo,status);
}
/*
private int getHour(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
return calendar.get(Calendar.HOUR_OF_DAY);
}
*/
}
public enum NettyListenerTopic {
//上传视频
UPLOAD_VIDEO("UploadVideo",1),
//售卖app升级
APP_UPDATE("AppUpdate",2),
/*
* 客户端检查网络
*/
CHECK_NETWORK("CheckNetwork",3)
;
private String name;
private int code;
private NettyListenerTopic(String name, int code) {
this.name = name;
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
/**
* @ClassName: NettyMessagePush
* @Description: NettyMessagePush 消息推送
* @author:
* @date: 2019年5月28日 下午3:53:32
*/
@Service
public class NettyMessagePush {
private static final Logger logger = LoggerFactory.getLogger(NettyMessagePush.class);
private static final String PREFIX = "fingerjk_";
/**
* @Title: push
* @Description: NettyPush
* @author:
* @date: 2019年5月28日 下午4:05:26
* @return: void
*/
public void push(List<String> devNos, String jsonMessage) {
logger.info("devNos={},jsonMessage={}", devNos, jsonMessage);
try {
//测试版本
for (String devNo : devNos) {
SocketChannel socketChannel = NettyChannelMap.get(PREFIX + devNo);
if (null == socketChannel || !socketChannel.isOpen()) {
logger.error("-------------devNo={},jsonMessage={},严重error! 要重新连接 null==socketChannel || ! socketChannel.isOpen()------------------",devNo,jsonMessage);
return;
}
logger.info("socketChannel={}", socketChannel);
ChannelFuture cf = socketChannel.writeAndFlush(jsonMessage);
logger.info("returnResult={}", JSON.toJSONString(cf));
}
} catch (Exception e) {
logger.error("failed devNos={},jsonMessage={},e={}", devNos, jsonMessage, e);
}
}
}
*
* 从缓存中检测消息发送
*/
public class NettyMessageTimerTask implements TimerTask{
private static final Logger logger = LoggerFactory.getLogger(NettyMessageTimerTask.class);
public static final long DELAY=1;
public static final TimeUnit UNIT = TimeUnit.SECONDS;
/*
* 客户端通道集合
NettyChannelMap nettyChannelMap=null;
public void setChannelMap(NettyChannelMap nettyChannelMap) {
this.nettyChannelMap=nettyChannelMap;
}
*/
@Override
public void run(Timeout timeout) throws Exception {
//logger.info("NettyTimerTask timeout "+NettyTimerTask.DELAY);
//处理任务
//if(Constants.TASK_QUEUE_HAS){
processChannelTask();
//Constants.TASK_QUEUE_HAS=false;
//}
//Make this task recurring
//再次执行
TimeOutTaskCreator.createTask(this,DELAY,UNIT);
}
private void processChannelTask() {
List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
if(tasks==null||tasks.size()<1){
//logger.info("暂无向客户端发送任务消息...");
return;
}
int count=tasks.size();
for(int n=count-1;n>-1;n--){
Message message=tasks.get(n);
String clientId=message.getClientId();
SocketChannel socketChannel=NettyChannelMap.get(clientId);
if (socketChannel==null||!socketChannel.isOpen()) {
continue;
}
logger.info("向客户端发送任务消息:"+message.toString());
socketChannel.writeAndFlush(message);
tasks.remove(n);
}
if(tasks.size()!=count){
RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
}
}
@Service("nettyServer")
public class NettyServer implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
EventLoopGroup boss =null;
EventLoopGroup worker =null;
ChannelFuture future=null;
int port=8888;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public void afterPropertiesSet() throws Exception {
//this.start();
}
public boolean isSuccess(){
if(future==null){
return false;
}
return future.isSuccess();
}
@PreDestroy
public void stop(){
if(future!=null){
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
boss.shutdownGracefully();
worker.shutdownGracefully();
future=null;
}
// Close down all the timer tasks.
TimeOutTaskCreator.stop();
}
/*@PostConstruct*/
public void start() {
logger.info("nettyServer 正在启动---------------");
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new NettyServerInitializer());
try {
future= bootstrap.bind(port).sync();
if(future.isSuccess()) {
logger.info("nettyServer 完成启动---------------");
//定时任务处理
NettyMessageTimerTask nettyTimerTask=new NettyMessageTimerTask();
//nettyTimerTask.setChannelMap(nettyChannelMap);
// Start up a recurring task to log throughput performance metrics.
//NettyTimerTask.DELAY
//延迟1分钟启动
TimeOutTaskCreator.createTask(nettyTimerTask,60,NettyMessageTimerTask.UNIT);
}
else{
logger.info("nettyServer 启动失败---------------{}",port);
}
// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch(Exception e) {
logger.info("nettyServer 启动时发生异常---------------{}",port);
logger.info(e.getMessage());
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public void sendMessage(String clientId,String topic,String content){
this.sendMessage(clientId,topic,content.getBytes());
}
public void sendMessage(String clientId,String topic,byte[] data){
Message message=new Message(clientId,topic);
message.setData(data);
this.sendMessage(message);
}
public void sendMessage(Message message) {
String clientId=message.getClientId();
if(StringUtils.isEmpty(clientId)){
logger.info("发送消息到客户端,客户ID不能为空");
return;
}
SocketChannel socketChannel=NettyChannelMap.get(clientId);
if (socketChannel!=null&&socketChannel.isOpen()) {
socketChannel.writeAndFlush(message);
}
else{
logger.info("客户端未连接服务器,发送消息失败");
}
}
/*
* 增加任务消息
*/
public void addTaskMessage(NettyTaskMessage taskMessage) {
if(!StringUtil.vaildParam(taskMessage.clientType,taskMessage.getTopic(),taskMessage.getFactoryDevNos())){
return;
}
if(!this.isSuccess()){
logger.info("nettyServer未启动");
return;
}
//增加到缓存
List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
if(tasks==null){
tasks=new ArrayList<Message>();
}
String content=ContextUtil.getJson(taskMessage.getData());
for(String factoryDevNo:taskMessage.getFactoryDevNos()){
//构造客户ID
String clientId=Constants.getClientId(taskMessage.clientType, factoryDevNo);
//构造消息
Message message=new Message(clientId,taskMessage.getTopic().getName());
message.setData(content);
tasks.add(message);
}
RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
//logger.info("增加处理任务:"+message.toString());
}
/*
* 增加任务消息
public void addTaskMessage(ClientType clientType, String factoryDevNo, NettyListenerTopic topic, String content) {
//构造客户ID
String clientId=Constants.getClientId(clientType, factoryDevNo);
//构造消息
Message message=new Message(clientId,topic.getName());
message.setData(content);
//增加到缓存
List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
if(tasks==null){
tasks=new ArrayList<Message>();
}
tasks.add(message);
RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
logger.info("增加处理任务:"+message.toString());
}
*/
}
@ChannelHandler.Sharable
public class NettyServerHandler extends DefaultClientHandler<Message> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/*
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel socketChannel=(SocketChannel) ctx.channel();
String clientId=this.nettyChannelMap.get(socketChannel);
logger.info("客户端连接--channelActive---"+clientId);
//客户端网络连接
this.network(socketChannel,clientId,1);
}
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SocketChannel socketChannel=(SocketChannel) ctx.channel();
String clientId=NettyChannelMap.get(socketChannel);
//客户端网络断开
this.network(socketChannel,clientId,0);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
//客户端ID
String clientId = msg.getClientId();
//主题
String topic = msg.getTopic();
if(topic.equals("CLIENT_START")){
logger.info("客户端连接--CLIENT_START");
//客户端通道
SocketChannel socketChannel=(SocketChannel) ctx.channel();
//客户端网络恢复
//this.producerRun(socketChannel,clientId,"Network","1".getBytes());
this.network(socketChannel,clientId,1);
}
else if(topic.equals("Ping")){
//对客户端的心跳不处理
logger.info("-------------------------客户端心跳不回复(一分钟Ping一次),clientId={}-------------------------------",clientId);
}
else{
//消息通道
SocketChannel socketChannel=NettyChannelMap.get(clientId);
this.producerRun(socketChannel,clientId,msg.getTopic(),msg.getData());
}
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
private void network(SocketChannel socketChannel, String clientId, int status) {
if(status==1){
//客户端登录
//保存客户端通道,先移除,后添加
NettyChannelMap.remove(clientId);
NettyChannelMap.add(clientId, socketChannel);
}
else{
NettyChannelMap.remove(clientId);
}
logger.info("客户端clientId={},status={} ",clientId,(status==1?"连接":"断开"));
this.producerRun(socketChannel,clientId,"Network",String.valueOf(status).getBytes());
}
/*
@Override
public void handlerRemoved(ChannelHandlerContext ctx)throws Exception{
Channel incoming = ctx.channel();
for (Channel channel : channels)
{
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress()
+ " 离开\n");
}
channels.remove(ctx.channel());
logger.info("客户端离开");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)throws Exception{
Channel incoming = ctx.channel();
for (Channel channel : channels)
{
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress()
+ " 加入\n");
}
channels.add(ctx.channel());
logger.info("客户端加入");
}
*/
}
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
final AcceptorIdleStateTrigger idleStateTrigger=new AcceptorIdleStateTrigger();
final NettyServerHandler nettyServerHandler=new NettyServerHandler();
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(
Constants.SERVER_READ_IDEL_TIME_OUT,
Constants.SERVER_WRITE_IDEL_TIME_OUT,
Constants.SERVER_ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS));
pipeline.addLast(idleStateTrigger);
// 字符串解码 和 编码
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast("encoder", new ObjectEncoder());
// 自己的逻辑Handler
pipeline.addLast(nettyServerHandler);
}
}
@Getter
@Setter
public class NettyTaskMessage {
ClientType clientType;
List<String> factoryDevNos;
NettyListenerTopic topic;
Object data;
@SuppressWarnings("unchecked")
public void put(String key,Object v)
{
if(data==null){
data=new HashMap<String,Object>();
}
((Map<String,Object>)data).put(key, v);
}
}
public class TimeOutTaskCreator {
//private static final Logger logger = LoggerFactory.getLogger(TimeOutTaskCreator.class);
private static HashedWheelTimer instance;
private static synchronized HashedWheelTimer getInstance() {
if (instance == null) {
instance = new HashedWheelTimer();
}
return instance;
}
public static Timeout createTask(TimerTask task, long delay, TimeUnit unit) {
return getInstance().newTimeout(task, delay, unit);
}
public static void stop() {
if (instance != null)
instance.stop();
}
}
netty根目录下:
public class Client2 {
public static void main(String[] args) throws InterruptedException{
//clientMsg("001");
//clientMsg("002");
clientMsg(Constants.getClientId(ClientType.FINGERJK,"93003637009"));
}
private static void clientMsg(String clientId) throws InterruptedException {
//服务启动
NettyClient nettyClient=new NettyClient();
nettyClient.setPort(Constants.SERVER_PORT);
nettyClient.setHost("172.16.1.70");
nettyClient.setClientId(clientId);
nettyClient.start();
TimeUnit.SECONDS.sleep(10);
//发送消息样例
Message t = new Message(clientId,"ClientTest");
t.setData("我是客户端 "+clientId+" 发送的消息");
//sc.writeAndFlush(t);
nettyClient.sendMessage(t);
}
}
public enum ClientType {
FINGERJK,//守护进程
MAP, //地图
MANAGER //管理端
}
public class Constants {
public static int CLIENT_NETWORK_STATUS=0;
public static final int SERVER_PORT = 6667;
public static final int CLENT_READ_IDEL_TIME_OUT=0; // 读超时 20s
public static final int CLENT_WRITE_IDEL_TIME_OUT=60; // 写超时 10s
public static final int CLENT_ALL_IDEL_TIME_OUT=0; // 所有超时 0s
public static final int SERVER_READ_IDEL_TIME_OUT=61; // 读超时 20s
public static final int SERVER_WRITE_IDEL_TIME_OUT=0; // 写超时 10s
public static final int SERVER_ALL_IDEL_TIME_OUT=0; // 所有超时 0s
public static final int MAX_LOSS_CONNECT_TIME=5; // 最大失联次数
public static final int RET_CONNECT_TIME_OUT=60; // 客户端重连超时60s
public static final int RET_CONNECT_TIME=1440; // 客户端重连次数,重连一整天
public static final String CHAR_SET="UTF-8"; //字符编码
/*
* 任务队列关键词
*/
public static final String TASK_QUEUE_KEY = "NETTY_TASK_QUEUE";
public static final long TASK_QUEUE_KEY_TIME_OUT=3600; //任务数据过期时间(1小时)
/*
* 从客户ID获取机器编号
*/
public static String getFactoryDevNo(String clientid){
return clientid.substring(clientid.indexOf("_")+1, clientid.length());
}
/*
* 由客户类型及机器码,构建客户ID
*/
public static String getClientId(ClientType clientType,String factoryDevNo){
return clientType.toString().toLowerCase()+"_"+factoryDevNo;
}
public abstract class DefaultClientHandler<T> extends SimpleChannelInboundHandler<T> {
private Map<String,DefaultMessageListener> map = new ConcurrentHashMap<String,DefaultMessageListener>();
public void add(String topic,DefaultMessageListener messageListener){
map.put(topic, messageListener);
}
public DefaultMessageListener get(String topic){
return map.get(topic);
}
public void producerRun(SocketChannel socketChannel,String clientId,String topic,byte[] data) {
//发射对象实例
DefaultMessageListener defaultMessageListener=this.get(topic);
if(defaultMessageListener==null){
defaultMessageListener=(DefaultMessageListener)DefaultMessageListener.newInsatnce(topic);
if(defaultMessageListener!=null){
this.add(topic, defaultMessageListener);
}
}
//执行
if(defaultMessageListener!=null){
defaultMessageListener.setClientId(clientId);
defaultMessageListener.setExecutor(socketChannel);
defaultMessageListener.recieveMessage(data);
}
}
public interface IMessageListener {
public void recieveMessage(byte[] data);
public SocketChannel getExecutor();
}
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
private String clientId="";
private String topic="";
private byte[] data=null;
public Message(String clientId,String topic)
{
this.clientId=clientId;
this.topic=topic;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public byte[] getData() {
return data;
}
public void setData(String content){
try {
this.data = content.getBytes(Constants.CHAR_SET);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setData(byte[] data) {
this.data = data;
}
@Override
public String toString() {
String s=data==null?"":new String(data);
return "Message [clientId=" + clientId + ", topic=" + topic + ", data=" + s + "]";
}
Constants.java
public static int CLIENT_NETWORK_STATUS=0;
public static final int SERVER_PORT = 6667;
public static final int CLENT_READ_IDEL_TIME_OUT=0; // 读超时 20s
public static final int CLENT_WRITE_IDEL_TIME_OUT=60; // 写超时 10s
public static final int CLENT_ALL_IDEL_TIME_OUT=0; // 所有超时 0s
public static final int SERVER_READ_IDEL_TIME_OUT=61; // 读超时 20s
public static final int SERVER_WRITE_IDEL_TIME_OUT=0; // 写超时 10s
public static final int SERVER_ALL_IDEL_TIME_OUT=0; // 所有超时 0s
public static final int MAX_LOSS_CONNECT_TIME=5; // 最大失联次数
public static final int RET_CONNECT_TIME_OUT=60; // 客户端重连超时60s
public static final int RET_CONNECT_TIME=1440; // 客户端重连次数,重连一整天
public static final String CHAR_SET="UTF-8"; //字符编码
/*
* 任务队列关键词
*/
public static final String TASK_QUEUE_KEY = "NETTY_TASK_QUEUE";
public static final long TASK_QUEUE_KEY_TIME_OUT=3600; //任务数据过期时间(1小时)
/*
* 从客户ID获取机器编号
*/
public static String getFactoryDevNo(String clientid){
return clientid.substring(clientid.indexOf("_")+1, clientid.length());
}
/*
* 由客户类型及机器码,构建客户ID
*/
public static String getClientId(ClientType clientType,String factoryDevNo){
return clientType.toString().toLowerCase()+"_"+factoryDevNo;
}
application-netty.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- <context:annotation-config />
<context:component-scan base-package="com.vendor.netty" init-method="start"/>
-->
<bean id="imServer" class="com.vendor.netty.server.NettyServer" init-method="start">
<property name="port" value="8888"/>
</bean>
</beans>