netty服务端的代码

2023-11-19

client code:

/**
 * 
 * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
 * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
 * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
 * 地获取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}

 

 

/**
 * 
 * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
    private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
    
    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;
    
    private final String host;

    private volatile boolean reconnect = true;
    private int attempts;
    
    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }
    
    /**
     * channel链路每次active的时候,将其连接的次数重新☞ 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("当前链路已经激活了,重连尝试次数重新置为0");
        
        attempts = 0;
        ctx.fireChannelActive();
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
        if(reconnect){
            logger.info("链接关闭,将进行重连");
            if (attempts < Constants.RET_CONNECT_TIME) {
                attempts++;
                //重连的间隔时间会越来越长
                int timeout = 1 << attempts;
                if(timeout>600){
                    timeout=600;
                }
                //timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
                //int timeout=Constants.RET_CONNECT_TIME_OUT;
                timer.newTimeout(this, timeout,TimeUnit.SECONDS);
            }
        }
        else
        {
            logger.info("链接关闭");
        }
        ctx.fireChannelInactive();
    }
    public void run(Timeout timeout) throws Exception {
        
        ChannelFuture future;
        //bootstrap已经初始化好了,只需要将handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }
        //future对象
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();
                //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                Channel channel=f.channel();
                if (!succeed) {
                    logger.info("重连失败");
                    channel.pipeline().fireChannelInactive();
                }else{
                    logger.info("重连成功");
                    //channel.pipeline().fireChannelActive();
                    //SocketChannel socketChannel=(SocketChannel) channel;
                    //socketChannel.writeAndFlush(new Message(this.clientId,"CLIENT_START"));
                }
            }
        });
    }

 

public class NettyClient implements InitializingBean{
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
    private int port;
    private String host;
    private String clientId;
    private SocketChannel socketChannel;
    protected final HashedWheelTimer timer = new HashedWheelTimer();
    @Override
    public void afterPropertiesSet() throws Exception {
        this.start();
    }
    public void start() {
        logger.info("客户端正在启动---------------");
        
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        
        //bootstrap.remoteAddress(host,port);
        //bootstrap.handler(new NettyClientInitializer(this.clientId));
        
        /*
        pipeline.addLast(new IdleStateHandler(
                Constants.READ_IDEL_TIME_OUT,
                Constants.WRITE_IDEL_TIME_OUT, 
                Constants.ALL_IDEL_TIME_OUT, 
                TimeUnit.SECONDS));
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        pipeline.addLast(nettyClientHandler);
         */
        final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, timer, port,host, true) {
             public ChannelHandler[] handlers() {
                 return new ChannelHandler[] {
                         this,
                         new IdleStateHandler(
                                 Constants.CLENT_READ_IDEL_TIME_OUT,
                                 Constants.CLENT_WRITE_IDEL_TIME_OUT, 
                                 Constants.CLENT_ALL_IDEL_TIME_OUT, 
                                 TimeUnit.SECONDS),
                         new ObjectEncoder(),
                         new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                         new NettyClientHandler(clientId)
                 };
             }
         };
        
        ChannelFuture future=null;
        //进行连接
        try {
            synchronized (bootstrap) {
                bootstrap.remoteAddress(host,port);
                bootstrap.handler(new ChannelInitializer<Channel>() {
                    //初始化channel
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });
                future = bootstrap.connect(host,port);
            }

            // 以下代码在synchronized同步块外面是安全的
            future.sync();
            
            if(future.isSuccess()) {
                socketChannel = (SocketChannel) future.channel();
                //socketChannelMap.put(clientId, socketChannel);
                //System.out.println("client start,clientId:"+clientId);
                //socketChannel.writeAndFlush("Hello Netty Server ,I am a common client");
                
                logger.info("客户端完成启动---------------");
            }
        } catch (Throwable t) {
            logger.info("客户端连接失败---------------");
            if (null != future) {
                if (future.channel() != null && future.channel().isOpen()) {
                    future.channel().close();
                }
            }
            eventLoopGroup.shutdownGracefully();
        }
    }
    public void sendMessage(String topic,String content){
        this.sendMessage(topic,content.getBytes());
    }
    public void sendMessage(String topic,byte[] data){
        Message message=new Message(this.clientId,topic);
        message.setData(data);
        this.sendMessage(message);
    }
    public void sendMessage(Message message) {
        if (this.socketChannel!=null&&this.socketChannel.isOpen()) {
            if(StringUtils.isEmpty(message.getClientId())){
                message.setClientId(this.clientId);
            }
            socketChannel.writeAndFlush(message);
            /*
            ChannelFuture future=this.socketChannel.writeAndFlush(message);
            future.addListener(new ChannelFutureListener(){
                public void operationComplete(final ChannelFuture future)throws Exception{
                    //printTime("connect结束: ");
                }
            });
            */
        }
        else{
            logger.error("客户端未连接服务器,发送消息失败!");
        }
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public String getClientId() {
        return clientId;
    }
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

 

 

/*
1)客户端连接服务端
2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s
3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)
4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道
5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路
6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连
 */
@ChannelHandler.Sharable
public class NettyClientHandler extends DefaultClientHandler<Message> {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    private int pingTime=0;//心跳次数
    Message pingMessage=null;//心跳数据
    String clientId="";
    public NettyClientHandler(String clientId) {
        this.clientId=clientId;
        pingMessage=new Message(clientId,"Ping");
    }
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    //发送心跳到服务器
                    //一分钟发送一次心跳
                    pingTime++;
                    //普通心跳不传数据
                    pingMessage.setData("");
                    PingMessageHelper.setMessageData(pingTime,pingMessage);
                    //一天重置一次
                    if(pingTime==1024){
                        pingTime=0;
                    }
                    ctx.writeAndFlush(pingMessage);
                    logger.info("客户端发送心跳----------");
                    break;
                default:
                    break;
            }
        }
    }
    @Override  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Constants.CLIENT_NETWORK_STATUS=1;
        logger.info("-------客户端上线----------");
        //发送登录消息
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        socketChannel.writeAndFlush(new Message(this.clientId,"CLIENT_START"));
        
        ctx.fireChannelActive();  
    }
    
    @Override  
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
        //System.out.println("停止时间是:"+new Date());  
        //System.out.println("HeartBeatClientHandler channelInactive");  
        Constants.CLIENT_NETWORK_STATUS=0;
        logger.info("-------客户端下线----------");
    }  
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        //客户端通道
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        this.producerRun(socketChannel,msg.getClientId(),msg.getTopic(),msg.getData());
        ReferenceCountUtil.release(msg);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //Channel incoming = ctx.channel();
        //logger.info("SimpleChatClient:"+incoming.remoteAddress()+"异常");
        //cause.printStackTrace();
        ctx.close();
    }

 

public class NettyClientInitializer extends ChannelInitializer<SocketChannel>{
    private NettyClientHandler nettyClientHandler;
    
    NettyClientInitializer(String clientId){
        nettyClientHandler = new NettyClientHandler(clientId);
    }
    
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline= socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                Constants.CLENT_READ_IDEL_TIME_OUT,
                Constants.CLENT_WRITE_IDEL_TIME_OUT, 
                Constants.CLENT_ALL_IDEL_TIME_OUT, 
                TimeUnit.SECONDS));
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        pipeline.addLast(nettyClientHandler);
    }
}

 

listener code

/*
 * 通知客户端升级APP
 */
public class AppUpdateMessageListener extends DefaultMessageListener{     
    private static final Logger logger = LoggerFactory.getLogger(AppUpdateMessageListener.class);
    @Override
    public void onReceiveMessages(byte[] data) {
        String json=this.data2String(data);
        if(StringUtils.isEmpty(json)){
            return;
        }
        JSONObject jsonObj=JSONObject.fromObject(json);
        //升级时间
        Long time=jsonObj.getLong("time");
        //升级设备组号
        String deviceNumber=jsonObj.getString("deviceNumber");
    }

/*
 * 检查网络状态
 */
public class CheckNetworkMessageListener extends DefaultMessageListener{
    private static final Logger logger = LoggerFactory.getLogger(CheckNetworkMessageListener.class);
    IDeviceService deviceService;
    @Override
    public void onReceiveMessages(byte[] data) {
        String strData=this.data2String(data);
        if(strData.equals("s2c")){
            logger.info("客户端["+this.getClientId()+"]接收到服务器端网络检查消息");
            //服务器发到客户端的消息
            //客户端回复给服务器
            Message t = new Message(this.getClientId(),"CheckNetwork");
            t.setData("c2s");
            this.sendMessage(t);
        }
        else if(strData.equals("c2s")){
            //客户端发到服务器的消息
            //表示客户端与服务器连接成功
            if(deviceService==null){
                deviceService = ContextUtil.getBeanByName(IDeviceService.class, "deviceService");
            }
            String facDevNo=Constants.getFactoryDevNo(this.getClientId());
            logger.info("服务器端接收到客户端["+facDevNo+"]回复的网络检查消息");
            
            Device device=deviceService.getByFacDevNo(facDevNo);
            //Device device=deviceService.findDeviceByDevNo(devNo);
            //deviceService.updateDeviceNetWorkStatus(facDevNo,DeviceNetworkLogEnum.OnLine.getStatus());
            if(device.getNetWorkState()==null||device.getNetWorkState().intValue()==DeviceNetworkLogEnum.OffLine.getStatus()){
                int count=deviceService.updateDeviceNetworkStatus(device,DeviceNetworkLogEnum.OnLine.getStatus());
                if(count>0){
                    logger.info("设备["+facDevNo+"]状态已更新为在线状态");
                }
            }
        }
    }

 

/*
 * 启动的监听器
 */
public class client_startMessageListener extends DefaultMessageListener{     
    private static final Logger logger = LoggerFactory.getLogger(client_startMessageListener.class);
    @Override
    public void onReceiveMessages(byte[] data) {
        
        logger.info("{}客户端启动连接成功......",this.getClientId());
    }
    
}

public class ClientTestMessageListener extends DefaultMessageListener{
             
    private static final Logger logger = LoggerFactory.getLogger(ClientTestMessageListener.class);
    @Override
    public void onReceiveMessages(byte[] data) {
        logger.info("来自客户端的消息:"+this.data2String(data));
    }
    
}
 

public abstract class DefaultMessageListener implements IMessageListener {
    //private static final Logger logger = LoggerFactory.getLogger(DefaultMessageListener.class);
    private SocketChannel executor;
    private String clientId;
    
    public String getClientId() {
        return clientId;
    }
    
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
    public void setExecutor(SocketChannel executor) {
        this.executor = executor;
    }
    @Override
    public void recieveMessage(byte[] data){
        this.onReceiveMessages(data);
    }
    public abstract void onReceiveMessages(byte[] data);
    @Override
    public SocketChannel getExecutor() {
        return this.executor;
    }
    
    public String data2String(byte[] data) {
        String s="";
        try {
            s=new String(data,Constants.CHAR_SET);
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return s;
    }
    public void sendMessage(Message message) {
        if (this.executor!=null&&this.executor.isOpen()) {
            //logger.info("服务器向客户端回复");
            this.executor.writeAndFlush(message);
        }
        else{
            //logger.error("客户端未连接服务器,发送消息失败");
        }
    }
    
    public static Object newInsatnce(String name) {
        Object result = null;
        try {
            String clsName="com.vendor.netty.listener."+name+"MessageListener";
            //Class c = Class.forName(clsName);
            Class cls=Class.forName(clsName, true, DefaultMessageListener.class.getClassLoader());
            result=cls.newInstance();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return result;
    }

 

/*
 * 服务器端监听客户段网络状态
 */
public class NetworkMessageListener extends DefaultMessageListener{
    private static final Logger logger = LoggerFactory.getLogger(NetworkMessageListener.class);
    private static DeviceWarnService deviceWarnService;
    @Override
    public void onReceiveMessages(byte[] data) {
        String factoryDevNo=Constants.getFactoryDevNo(this.getClientId());
        if(StringUtils.isEmpty(factoryDevNo)){
            return;
        }
        int status=Integer.parseInt(data2String(data));
        logger.info("客户端[{}]链接{}",factoryDevNo,status==1?"打开":"断开"); 
        
        if(deviceWarnService==null){
            deviceWarnService = ContextUtil.getBeanByName(DeviceWarnService.class, "deviceWarnService");
        }
        deviceWarnService.updateDeviceNetworkStatus(factoryDevNo,status);
        /*
        String offlineDeviceKey="OFFLINE_DEVICE_"+factoryDevNo;
        
        int timeOut=1;
        //定时任务处理
        NettyDeviceStatusTimerTask nettyTimerTask=new NettyDeviceStatusTimerTask(deviceWarnService);
        nettyTimerTask.setCacheKey(offlineDeviceKey);
        nettyTimerTask.setFactoryDevNo(factoryDevNo);
        nettyTimerTask.setStatus(status);
        //如果是离线状态,延时一分钟更新
        if(status==0){
            timeOut=70;
            RedisKit.setCache(offlineDeviceKey,status,timeOut-10);
        }
        else if(status==1){
            timeOut=1;
            //获取上次设备离线状态
            Integer cacheStatus=RedisKit.getCache(offlineDeviceKey);
            if(cacheStatus!=null){
                //在一分钟内设备又上线了
                RedisKit.deleteCache(offlineDeviceKey);
            }
            //设备状态更新
            //this.updateDeviceStatus(facDevNo,status);
        }
        else{
            timeOut=1;
        }
        TimeOutTaskCreator.createTask(nettyTimerTask,timeOut,NettyMessageTimerTask.UNIT);
        */
    }

public class PingMessageHelper {

    public static void setMessageData(int pingTime, Message pingMessage) {
        //时间由大到小
         String data="";
         if(pingTime%10==0){
            //10分钟业务处理
            data=pingTime+"分钟业务处理";
         }
         //5分钟业务处理
         else  if(pingTime%5==0){
             data=pingTime+"分钟业务处理";
         }
         else{
             //常规心跳
             //一分钟一次
             data="";
         }
         
         pingMessage.setData(StringUtils.isEmpty(data)?null:data.getBytes());
    }
}

 

public class PingMessageListener extends DefaultMessageListener{
    private static final Logger logger = LoggerFactory.getLogger(PingMessageListener.class);
    @Override
    public void onReceiveMessages(byte[] data) {
        if(data==null){
            //心跳,检查客户端是否存活
            /*
            logger.info("服务器端接受到Ping:"+this.getClientId());
                        Message apply=new Message(this.getClientId(), "ToClient");
            apply.setData("Ping:服务器端回复客户端".getBytes());
            this.sendMessage(apply);
            */
        }
        else{
            //业务数据处理
            //logger.info(new String(data));
            //fingerjk_7101444444
            //fingerjk_93006033
            //fingerjk_93006032
            //fingerjk_93006041
        }
    }
}

 

/*
 * 通知客户端上传视频文件
 */
public class UploadVideoMessageListener extends DefaultMessageListener{     
    private static final Logger logger = LoggerFactory.getLogger(UploadVideoMessageListener.class);
    @Override
    public void onReceiveMessages(byte[] data) {
        String orderNo=this.data2String(data);
        
    }
    
}

 

server code

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
    private int loss_connect_time = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            IdleState state=event.state();
            if (state==IdleState.READER_IDLE) {
                loss_connect_time++;
                logger.info(String.valueOf(Constants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"秒没有接收到客户端的信息了");
                if(loss_connect_time>=Constants.MAX_LOSS_CONNECT_TIME){
                    logger.info("------------服务器主动关闭客户端链路");
                    ctx.channel().close();
                }
            }
        } else {
            super.userEventTriggered(ctx,evt);
        }
    }
    
}

 

public class NettyChannelMap {
    public static volatile CommonMap<SocketChannel> map=null;
    public static CommonMap<SocketChannel> getChannelMap() {
        if (map == null) {
            synchronized (CommonMap.class) {
                map=new CommonMap<SocketChannel>();
            }
        }
        return map;
    }
    
    public synchronized static  void add(String clientId,SocketChannel socketChannel){
        getChannelMap().put(clientId, socketChannel);
    }
    
    public synchronized static  SocketChannel get(String clientId){
        return getChannelMap().get(clientId);
    }
    public synchronized static String get(SocketChannel socketChannel){
        return getChannelMap().get(socketChannel);
    }

    public synchronized static SocketChannel remove(String clientId){
        return getChannelMap().remove(clientId);
    }
    public synchronized static  String remove(SocketChannel socketChannel){
        return getChannelMap().remove(socketChannel);
    }
    public synchronized static  boolean contains(String clientId) {
        return getChannelMap().contains(clientId);
    }
}

 

@Component("nettyConfigue")
public class NettyConfigue implements ApplicationListener<ContextRefreshedEvent>{
    private static final Logger logger = LoggerFactory.getLogger(NettyConfigue.class);
    @Autowired NettyServer nettyServer;
    
    @Value("${netty.port}")
    private Integer netty_port;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        logger.info("NettyConfigue 端口配置:{}",netty_port);
        if(netty_port==null){
            return;
        }
        if(netty_port!=null){
            /*
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if(!nettyServer.isSuccess()){
                        nettyServer.setPort(netty_port);
                        nettyServer.start();
                    }
                }
            }).start();
            */
            logger.info("NettyConfigue 延迟一分钟启动");
            long delay=60000;
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                  //启动socket监听
                    if(!nettyServer.isSuccess()){
                          nettyServer.setPort(netty_port);
                          nettyServer.start();
                      }
                }
            }, delay);
        }
        else{
            logger.error("NettyConfigue端口配置错误:{}",netty_port);
        }
        
    }
    
}

 

/*
 * 更新设备状态
 */
public class NettyDeviceStatusTimerTask implements TimerTask{
    private static final Logger logger = LoggerFactory.getLogger(NettyDeviceStatusTimerTask.class);
    DeviceWarnService deviceWarnService=null;
    String cacheKey="";
    String factoryDevNo="";
    int status=0;
    
    public void setCacheKey(String cacheKey) {
        this.cacheKey = cacheKey;
    }
    public void setFactoryDevNo(String factoryDevNo) {
        this.factoryDevNo = factoryDevNo;
    }
    public void setStatus(Integer status) {
        this.status = status;
    }
    public NettyDeviceStatusTimerTask(DeviceWarnService deviceWarnService) {
        this.deviceWarnService=deviceWarnService;
    }

    @Override
    public void run(Timeout timeout) throws Exception {
        synchronized(this){
            logger.info("客户端[factoryDevNo:{},status:{}]",this.factoryDevNo,this.status); 
            if(this.status==0){
                //离线
                //一分钟后执行的
                //判断缓存中是否存在
                Integer cacheStatus=RedisKit.getCache(this.cacheKey);
                logger.info("cacheStatus:{}",cacheStatus); 
                if(cacheStatus!=null&&cacheStatus==0){
                    //离线超过一分钟
                    this.updateDeviceStatus(this.factoryDevNo,this.status);
                }
            }
            else{
                //上线
                //立马上线
                this.updateDeviceStatus(this.factoryDevNo,this.status);
            }
        }
    }
    void updateDeviceStatus(String facDevNo, int status) {
        deviceWarnService.updateDeviceNetworkStatus(facDevNo,status);
    }
    /*
    private int getHour(Date date) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        return calendar.get(Calendar.HOUR_OF_DAY);
    }
    */
}


public enum NettyListenerTopic {
    //上传视频
    UPLOAD_VIDEO("UploadVideo",1),
    //售卖app升级
    APP_UPDATE("AppUpdate",2),
    /*
     * 客户端检查网络
     */
    CHECK_NETWORK("CheckNetwork",3)
    ;
    private String name;
    private int code;
    private NettyListenerTopic(String name, int code) {
        this.name = name;
        this.code = code;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getCode() {
        return code;
    }
    public void setCode(int code) {
        this.code = code;
    }
    
}

 

/**
 * @ClassName: NettyMessagePush
 * @Description: NettyMessagePush 消息推送
 * @author: 
 * @date: 2019年5月28日 下午3:53:32
 */
@Service
public class NettyMessagePush {

    private static final Logger logger = LoggerFactory.getLogger(NettyMessagePush.class);
    private static final String PREFIX = "fingerjk_";

    /**
     * @Title: push
     * @Description: NettyPush
     * @author: 
     * @date: 2019年5月28日 下午4:05:26
     * @return: void
     */
    public void push(List<String> devNos, String jsonMessage) {

        logger.info("devNos={},jsonMessage={}", devNos, jsonMessage);
        try {
            //测试版本
            for (String devNo : devNos) {
                SocketChannel socketChannel = NettyChannelMap.get(PREFIX + devNo);
                if (null == socketChannel || !socketChannel.isOpen()) {
                    logger.error("-------------devNo={},jsonMessage={},严重error! 要重新连接 null==socketChannel || ! socketChannel.isOpen()------------------",devNo,jsonMessage);
                    return;
                }
                logger.info("socketChannel={}", socketChannel);
                ChannelFuture cf = socketChannel.writeAndFlush(jsonMessage);
                logger.info("returnResult={}", JSON.toJSONString(cf));
            }
        } catch (Exception e) {
            logger.error("failed devNos={},jsonMessage={},e={}", devNos, jsonMessage, e);
        }
    }
}

*
 * 从缓存中检测消息发送
 */
public class NettyMessageTimerTask implements TimerTask{
    private static final Logger logger = LoggerFactory.getLogger(NettyMessageTimerTask.class);
    public static final long DELAY=1;
    public static final TimeUnit UNIT = TimeUnit.SECONDS;
    /*
     * 客户端通道集合
    NettyChannelMap nettyChannelMap=null;
    public void setChannelMap(NettyChannelMap nettyChannelMap) {
        this.nettyChannelMap=nettyChannelMap;
    }
    */
    
    @Override
    public void run(Timeout timeout) throws Exception {
        //logger.info("NettyTimerTask timeout "+NettyTimerTask.DELAY);
        //处理任务
        //if(Constants.TASK_QUEUE_HAS){
            processChannelTask();
            //Constants.TASK_QUEUE_HAS=false;
        //}
        //Make this task recurring
        //再次执行
        TimeOutTaskCreator.createTask(this,DELAY,UNIT);
    }
    private void processChannelTask() {
        
        List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
        if(tasks==null||tasks.size()<1){
            //logger.info("暂无向客户端发送任务消息...");
            return;
        }
        int count=tasks.size();
        for(int n=count-1;n>-1;n--){
            Message message=tasks.get(n);
            String clientId=message.getClientId();
            SocketChannel socketChannel=NettyChannelMap.get(clientId);
            if (socketChannel==null||!socketChannel.isOpen()) {
                continue;
            }
            logger.info("向客户端发送任务消息:"+message.toString());
            socketChannel.writeAndFlush(message);
            tasks.remove(n);
        }
        if(tasks.size()!=count){
            RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
        }
    }

@Service("nettyServer")
public class NettyServer implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    EventLoopGroup boss =null;
    EventLoopGroup worker =null;
    ChannelFuture future=null;
    int port=8888;
    
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        //this.start();
    }
    
    public boolean isSuccess(){
        if(future==null){
            return false;
        }
        return future.isSuccess();
    }
    
    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            future=null;
        }
        
        // Close down all the timer tasks.
        TimeOutTaskCreator.stop();
    }
    /*@PostConstruct*/
    public void start() {
        logger.info("nettyServer 正在启动---------------");

        boss = new NioEventLoopGroup();
        worker = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        // 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new NettyServerInitializer());
        try {
            future= bootstrap.bind(port).sync();
            if(future.isSuccess()) {
                logger.info("nettyServer 完成启动---------------");
                
                //定时任务处理
                NettyMessageTimerTask nettyTimerTask=new NettyMessageTimerTask();
                //nettyTimerTask.setChannelMap(nettyChannelMap);
                // Start up a recurring task to log throughput performance metrics.
                //NettyTimerTask.DELAY
                //延迟1分钟启动
                TimeOutTaskCreator.createTask(nettyTimerTask,60,NettyMessageTimerTask.UNIT);
            }
            else{
                logger.info("nettyServer 启动失败---------------{}",port);
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch(Exception e) {
            logger.info("nettyServer 启动时发生异常---------------{}",port);
            logger.info(e.getMessage());
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
    public void sendMessage(String clientId,String topic,String content){
        this.sendMessage(clientId,topic,content.getBytes());
    }
    public void sendMessage(String clientId,String topic,byte[] data){
        Message message=new Message(clientId,topic);
        message.setData(data);
        this.sendMessage(message);
    }
    public void sendMessage(Message message) {
        String clientId=message.getClientId();
        if(StringUtils.isEmpty(clientId)){
            logger.info("发送消息到客户端,客户ID不能为空");
            return;
        }
        SocketChannel socketChannel=NettyChannelMap.get(clientId);
        if (socketChannel!=null&&socketChannel.isOpen()) {
            socketChannel.writeAndFlush(message);
        }
        else{
            logger.info("客户端未连接服务器,发送消息失败");
        }
    }
    /*
     * 增加任务消息
    */
    public void addTaskMessage(NettyTaskMessage taskMessage) {
        if(!StringUtil.vaildParam(taskMessage.clientType,taskMessage.getTopic(),taskMessage.getFactoryDevNos())){
            return;
        }
        if(!this.isSuccess()){
            logger.info("nettyServer未启动");
            return;
        }
        //增加到缓存
        List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
        if(tasks==null){
            tasks=new ArrayList<Message>();
        }
        String content=ContextUtil.getJson(taskMessage.getData());
        for(String factoryDevNo:taskMessage.getFactoryDevNos()){
            //构造客户ID
            String clientId=Constants.getClientId(taskMessage.clientType, factoryDevNo);
            //构造消息
            Message message=new Message(clientId,taskMessage.getTopic().getName());
            message.setData(content);
            tasks.add(message);
        }
        RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
        //logger.info("增加处理任务:"+message.toString());
    }
    /*
     * 增加任务消息
    public void addTaskMessage(ClientType clientType, String factoryDevNo, NettyListenerTopic topic, String content) {
        //构造客户ID
        String clientId=Constants.getClientId(clientType, factoryDevNo);
        //构造消息
        Message message=new Message(clientId,topic.getName());
        message.setData(content);
        //增加到缓存
        List<Message> tasks=RedisKit.getCache(Constants.TASK_QUEUE_KEY);
        if(tasks==null){
            tasks=new ArrayList<Message>();
        }
        tasks.add(message);
        RedisKit.setCache(Constants.TASK_QUEUE_KEY,tasks,Constants.TASK_QUEUE_KEY_TIME_OUT);
        logger.info("增加处理任务:"+message.toString());
    }
    */
}

 

@ChannelHandler.Sharable
public class NettyServerHandler extends DefaultClientHandler<Message> {
    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    /*
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        String clientId=this.nettyChannelMap.get(socketChannel);
        logger.info("客户端连接--channelActive---"+clientId); 
        //客户端网络连接
        this.network(socketChannel,clientId,1);
    }
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        String clientId=NettyChannelMap.get(socketChannel);
        //客户端网络断开
        this.network(socketChannel,clientId,0);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        //客户端ID
        String clientId = msg.getClientId();
        //主题
        String topic = msg.getTopic();
        if(topic.equals("CLIENT_START")){
            logger.info("客户端连接--CLIENT_START"); 
            //客户端通道
            SocketChannel socketChannel=(SocketChannel) ctx.channel();
            //客户端网络恢复
            //this.producerRun(socketChannel,clientId,"Network","1".getBytes());
            this.network(socketChannel,clientId,1);
        }
        else if(topic.equals("Ping")){
            //对客户端的心跳不处理
            logger.info("-------------------------客户端心跳不回复(一分钟Ping一次),clientId={}-------------------------------",clientId); 
        }
        else{
            //消息通道
            SocketChannel socketChannel=NettyChannelMap.get(clientId);
            this.producerRun(socketChannel,clientId,msg.getTopic(),msg.getData());
        }
        ReferenceCountUtil.release(msg);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }
    private void network(SocketChannel socketChannel, String clientId, int status) {
        if(status==1){
            //客户端登录
            //保存客户端通道,先移除,后添加
            NettyChannelMap.remove(clientId);
            NettyChannelMap.add(clientId, socketChannel);
        }
        else{
            NettyChannelMap.remove(clientId);
        }
        logger.info("客户端clientId={},status={} ",clientId,(status==1?"连接":"断开"));
        
        this.producerRun(socketChannel,clientId,"Network",String.valueOf(status).getBytes());
    }
    /*
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception{
        
        Channel incoming = ctx.channel();
        for (Channel channel : channels)
        {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress()
                    + " 离开\n");
        }
        channels.remove(ctx.channel());
        
        logger.info("客户端离开"); 
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception{
        
        Channel incoming = ctx.channel();
        for (Channel channel : channels)
        {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress()
                    + " 加入\n");
        }
        channels.add(ctx.channel());
        
        logger.info("客户端加入"); 
    }
    */
}

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    final AcceptorIdleStateTrigger idleStateTrigger=new AcceptorIdleStateTrigger();
    final NettyServerHandler nettyServerHandler=new NettyServerHandler();
    
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                Constants.SERVER_READ_IDEL_TIME_OUT,
                Constants.SERVER_WRITE_IDEL_TIME_OUT, 
                Constants.SERVER_ALL_IDEL_TIME_OUT,
                TimeUnit.SECONDS));
        pipeline.addLast(idleStateTrigger);
        // 字符串解码 和 编码
        pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        pipeline.addLast("encoder", new ObjectEncoder());
        // 自己的逻辑Handler
        pipeline.addLast(nettyServerHandler);
    }
}
 

 


@Getter
@Setter
public class NettyTaskMessage {
    ClientType clientType;
    List<String> factoryDevNos;
    NettyListenerTopic topic;
    Object data;
    
    @SuppressWarnings("unchecked")
    public void put(String key,Object v)
    {
        if(data==null){
            data=new HashMap<String,Object>();
        }
        ((Map<String,Object>)data).put(key, v);
    }
}
 


public class TimeOutTaskCreator {

    //private static final Logger logger = LoggerFactory.getLogger(TimeOutTaskCreator.class);
    private static HashedWheelTimer instance;

    private static synchronized HashedWheelTimer getInstance() {
        if (instance == null) {
            instance = new HashedWheelTimer();
        }

        return instance;
    }

    public static Timeout createTask(TimerTask task, long delay, TimeUnit unit) {
        return getInstance().newTimeout(task, delay, unit);
    }

    public static void stop() {
        if (instance != null)
            instance.stop();
    }
}

netty根目录下:

public class Client2 {
    
    public static void main(String[] args) throws InterruptedException{
        //clientMsg("001");
        //clientMsg("002");
        clientMsg(Constants.getClientId(ClientType.FINGERJK,"93003637009"));
    }
    private static void clientMsg(String clientId) throws InterruptedException {
        //服务启动
        NettyClient nettyClient=new NettyClient();
        nettyClient.setPort(Constants.SERVER_PORT);
        nettyClient.setHost("172.16.1.70");
        nettyClient.setClientId(clientId);
        nettyClient.start();
        
        TimeUnit.SECONDS.sleep(10);
        
        //发送消息样例
        Message t = new Message(clientId,"ClientTest");
        t.setData("我是客户端 "+clientId+" 发送的消息");
        //sc.writeAndFlush(t);
        nettyClient.sendMessage(t);
    }
}

public enum ClientType {
    FINGERJK,//守护进程
    MAP,     //地图
    MANAGER  //管理端
}
 

public class Constants {
    public static int CLIENT_NETWORK_STATUS=0;
    
    public static final int SERVER_PORT = 6667;
    
    public static final int CLENT_READ_IDEL_TIME_OUT=0; // 读超时 20s
    public static final int CLENT_WRITE_IDEL_TIME_OUT=60; // 写超时 10s
    public static final int CLENT_ALL_IDEL_TIME_OUT=0;    // 所有超时 0s
    
    public static final int SERVER_READ_IDEL_TIME_OUT=61; // 读超时 20s
    public static final int SERVER_WRITE_IDEL_TIME_OUT=0;  // 写超时 10s
    public static final int SERVER_ALL_IDEL_TIME_OUT=0;    // 所有超时 0s
    
    
    public static final int MAX_LOSS_CONNECT_TIME=5;  // 最大失联次数
    
    public static final int RET_CONNECT_TIME_OUT=60;  // 客户端重连超时60s
    public static final int RET_CONNECT_TIME=1440;    // 客户端重连次数,重连一整天
    
    public static final String CHAR_SET="UTF-8";      //字符编码

    /*
     * 任务队列关键词
     */
    public static final String TASK_QUEUE_KEY = "NETTY_TASK_QUEUE";
    public static final long TASK_QUEUE_KEY_TIME_OUT=3600;   //任务数据过期时间(1小时)
    /*
     * 从客户ID获取机器编号
     */
    public static String getFactoryDevNo(String clientid){
        return clientid.substring(clientid.indexOf("_")+1, clientid.length());
    }
    /*
     * 由客户类型及机器码,构建客户ID
     */
    public static String getClientId(ClientType clientType,String factoryDevNo){
        return clientType.toString().toLowerCase()+"_"+factoryDevNo;
    }

 

public abstract class DefaultClientHandler<T> extends SimpleChannelInboundHandler<T> {
    
    private Map<String,DefaultMessageListener> map = new ConcurrentHashMap<String,DefaultMessageListener>();
    
    public void add(String topic,DefaultMessageListener messageListener){
        map.put(topic, messageListener);
    }
    
    public DefaultMessageListener get(String topic){
        return map.get(topic);
     }
    
    public void producerRun(SocketChannel socketChannel,String clientId,String topic,byte[] data) {
        //发射对象实例
        DefaultMessageListener defaultMessageListener=this.get(topic);
        if(defaultMessageListener==null){
            defaultMessageListener=(DefaultMessageListener)DefaultMessageListener.newInsatnce(topic);
            if(defaultMessageListener!=null){
                this.add(topic, defaultMessageListener);
            }
        }
        //执行
        if(defaultMessageListener!=null){
            defaultMessageListener.setClientId(clientId);
            defaultMessageListener.setExecutor(socketChannel);
            defaultMessageListener.recieveMessage(data);
        }
    }

public interface IMessageListener {
    public void recieveMessage(byte[] data);
    public SocketChannel getExecutor();
}
 

public class Message implements Serializable {
    private static final long serialVersionUID = 1L;
    private String clientId="";
    private String topic="";
    private byte[] data=null;
    
    public Message(String clientId,String topic)
    {
        this.clientId=clientId;
        this.topic=topic;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(String content){
        try {
            this.data = content.getBytes(Constants.CHAR_SET);
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    public void setData(byte[] data) {
        this.data = data;
    }

    @Override
    public String toString() {
        String s=data==null?"":new String(data);
        return "Message [clientId=" + clientId + ", topic=" + topic + ", data=" + s + "]";
    }

 

 

Constants.java


    public static int CLIENT_NETWORK_STATUS=0;
    
    public static final int SERVER_PORT = 6667;
    
    public static final int CLENT_READ_IDEL_TIME_OUT=0; // 读超时 20s
    public static final int CLENT_WRITE_IDEL_TIME_OUT=60; // 写超时 10s
    public static final int CLENT_ALL_IDEL_TIME_OUT=0;    // 所有超时 0s
    
    public static final int SERVER_READ_IDEL_TIME_OUT=61; // 读超时 20s
    public static final int SERVER_WRITE_IDEL_TIME_OUT=0;  // 写超时 10s
    public static final int SERVER_ALL_IDEL_TIME_OUT=0;    // 所有超时 0s
    
    
    public static final int MAX_LOSS_CONNECT_TIME=5;  // 最大失联次数
    
    public static final int RET_CONNECT_TIME_OUT=60;  // 客户端重连超时60s
    public static final int RET_CONNECT_TIME=1440;    // 客户端重连次数,重连一整天
    
    public static final String CHAR_SET="UTF-8";      //字符编码

    /*
     * 任务队列关键词
     */
    public static final String TASK_QUEUE_KEY = "NETTY_TASK_QUEUE";
    public static final long TASK_QUEUE_KEY_TIME_OUT=3600;   //任务数据过期时间(1小时)
    /*
     * 从客户ID获取机器编号
     */
    public static String getFactoryDevNo(String clientid){
        return clientid.substring(clientid.indexOf("_")+1, clientid.length());
    }
    /*
     * 由客户类型及机器码,构建客户ID
     */
    public static String getClientId(ClientType clientType,String factoryDevNo){
        return clientType.toString().toLowerCase()+"_"+factoryDevNo;
    }
 

 

    

application-netty.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- <context:annotation-config /> 
    <context:component-scan base-package="com.vendor.netty" init-method="start"/>
    -->
    <bean id="imServer" class="com.vendor.netty.server.NettyServer" init-method="start">
        <property name="port" value="8888"/>
    </bean>
</beans>

转载于:https://my.oschina.net/u/3238697/blog/3068187

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

netty服务端的代码 的相关文章

  • 如何在Java中优雅地处理SIGKILL信号

    当程序收到终止信号时如何处理清理 例如 我连接到一个应用程序 希望任何第三方应用程序 我的应用程序 发送finish注销时的命令 发送该信息最好说什么finish当我的应用程序被破坏时的命令kill 9 编辑1 kill 9无法被捕获 谢谢
  • JavaFX 2.0 FXML 子窗口

    经过多次搜索我发现了这个问题如何创建 javafx 2 0 应用程序 MDI https stackoverflow com questions 10915388 how to create a javafx 2 0 application
  • jq:根据 group by 计算嵌套对象值

    Json account 1 cost usage low totalcost 2 01 account 2 cost usage low totalcost 2 25 account 1 cost usage low totalcost
  • 在 Java 中从 SOAPMessage 获取原始 XML

    我已经在 J AX WS 中设置了 SOAP WebServiceProvider 但我无法弄清楚如何从 SOAPMessage 或任何 Node 对象获取原始 XML 下面是我现在获得的代码示例 以及我试图获取 XML 的位置 WebSe
  • 如何将现有的 SQLite3 数据库导入 Room?

    好吧 我在桌面上使用 SQLite3 创建了一个只需要读取的某些信息的数据库 我正在制作的应用程序不需要在此表中插入或删除信息 我在 Room 数据库层上做了相当多的谷歌搜索 所有文档都需要在构建应用程序时在 Room 中创建一个新的数据库
  • 如何为小程序提供对文件系统写入的访问权限

    我在设置小程序的策略文件时遇到问题 我是第一次这样做 不知道如何在java中设置小程序的策略文件 实际上我想授予小程序在文件系统上写入的权限 为此我必须向小程序授予文件权限 所以我创建了一个名为 java policy 的文件 并将以下代码
  • 将 jQuery 与 Selenium WebDriver 结合使用 - 如何将 JSON 对象转换为 WebElement?

    我正在使用 Selenium WebDriver 我想执行 jQuery 代码来查找一些元素 我的代码如下 public function uploadGrantDoc script return itemlist grant file u
  • java中的单链表和双向链表?

    在java中 哪个集合接口可以有效地实现单链表和双向链表 请问代码示例吗 毫不奇怪 实现双向链表的正确接口是 LinkedList 看Java文档 http docs oracle com javase 8 docs api java ut
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 使用 JAX-WS 的 WebLogic 中没有模式导入的单个 WSDL

    如何使用 JAX WS 配置由 WebLogic 10 3 6 生成的 Web 服务 以将对象架构包含在单个 WSDL 文件声明 而不是导入声明 中 示例代码 界面 import javax ejb Local Local public i
  • 需要正则表达式帮助

    我正在尝试替换两次或多次出现的 br like br br br 标签与两个一起 br br 具有以下模式 Pattern brTagPattern Pattern compile lt s br s s gt s 2 Pattern CA
  • LocalDate 减去 period 得到错误的结果

    LocalDate减去一个Period 如 28年1个月27天 得到错误的结果 但减去一个Period 只有天单位 如 10282 天 得到正确的结果 有什么需要注意的吗 public static void main String arg
  • 是否有最新的 Facebook Java SDK? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 好像没找到最近更新的 如果没有 是否有一个好的 Java 库来执行与 Facebook 的 API 交
  • 无法从 JQuery ajax 调用接收 JSON

    我已经确定来自服务器的 JSON 是有效的 手动进行 ajax 调用 但我真的很想使用 JQuery 我还使用 firebug 确定发送到服务器的 post URL 是正确的 但是 错误回调仍然被触发 解析错误 我还尝试了数据类型 文本 我
  • Java和手动执行finalize

    如果我打电话finalize 在我的程序代码中的一个对象上 JVM当垃圾收集器处理这个对象时仍然再次运行该方法吗 这是一个大概的例子 MyObject m new MyObject m finalize m null System gc 是
  • 如何清理 Runtime.exec() 中使用的用户输入?

    我需要通过命令行调用自定义脚本 这些脚本需要很少的参数并在 Linux 机器上调用 当前版本容易出现各种shell注入 如何清理用户给出的参数 参数包括登录名和路径 Unix 或 Windows 路径 用户应该能够输入任何可能的路径 该路径
  • Jackson 反序列化相当于 @JsonUnwrapped 吗?

    假设我有以下课程 public class Parent public int age JsonUnwrapped public Name name 生成 JSON age 18 first Joey last Sixpack 我如何将其反
  • Android Google 地图无法在当前主题中找到样式“mapViewStyle”

    添加谷歌地图视图时 我扩展了MapView 使用xml编辑器将其添加到活动中 并将我的谷歌地图api密钥手动添加到布局xml文件中 我的权限在清单文件中允许互联网 我想知道的是 在 xml 编辑器中 我收到错误 无法在当前主题中找到样式 m
  • 你能快速告诉我这个伪代码是否有意义吗?

    我相信我的代码现在是万无一失的 我现在将写出伪代码 但我确实有一个问题 为什么 DRJava 要求我返回 if 语句之外的内容 正如你所看到的 我为 ex 写了 return 1 只是因为它问了 但是它永远不会返回该值 谁可以给我解释一下这
  • 将数组值导出到 csv 文件 java

    我只需要帮助将数组元素导出到 csv 文件 我不知道我的代码有什么问题 任何帮助将不胜感激 谢谢 for int index 0 index lt cols length index FileWriter fw new FileWriter

随机推荐

  • 电脑浏览器换IP怎么换?四种浏览器教程一起看看吧。【芝麻http】

    浏览器设置代理IP上网是代理IP最基础的使用方法 不同的浏览器设置代理IP的方法略有不同 下面就几种常用的浏览器如何设置代理IP进行说明 一 IE浏览器设置代理IP IE浏览器无疑是最常用的浏览器 而像360浏览器 搜狗浏览器 QQ浏览器这
  • C++中的.和->

    C 中的 和 gt 1 C 中的点 的应用 如果是一个对象或者引用去调用成员变量或者成员函数函数的话 会使用到点 include
  • vue-cli webpack配置cdn路径 以及 上线之后的字体文件跨域处理

    昨天搞了一下vue项目打包之后静态资源走阿里云cdn 配置了半天 终于找到了设置的地方 config index js 里面设置build 下的 assetsPublicPath 打包的时候便可以添加公共前缀路径 assetsSubDire
  • c++ 封装

    目录 封装的意义一 封装的意义二 struct和class区别 成员属性设置为私有 封装的意义一 封装是c 面向对象的三大特征之一 1 将属性和行为作为一个整体 表现生活中的事物
  • 阿里云在线扩展云盘记录-ubuntu系统

    fdisk l 查看真实磁盘大小 apt get update 更新库 apt get install y cloud guest utils 安装命令 growpart dev vda 1 扩容命令在线扩展磁盘 若报错 执行以下操作 LA
  • C语言动态内存开辟,malloc,calloc,free,realloc函数使用

    目录 一 内存的动态分配 1 函数malloc 2 函数calloc 3 函数realloc 4 函数free 关于动态内存错误的操作案例 一 内存的动态分配 1 函数malloc 函数原型 void malloc size t size
  • 为什么抖音总显示连不上服务器,抖音登录不上怎么回事

    大家好 我是时间财富网智能客服时间君 上述问题将由我为大家进行解答 以抖音v12 5 0为例 抖音登录不上 一般来说都是由于网速过慢 无法连接抖音的服务器 网速过慢一般都是由于手机的信号过低 或者是处于在人群较多的地方 造成了手机的网速变慢
  • Euromap 63协议认识

    Euromap 63协议认识 一 用途 Euromap 63是欧洲塑料和橡胶机械制造商协会颁布的专用于注塑机和上位计算机进行数据交互的协议 全称 Euromap 63 SPI SPI 塑料工业协会 Euromap 63的目标是为不同制造商的
  • Arduino String.h库函数详解

    此库中包含 1 charAT 2 compareTo 3 concat 4 endsWith 5 equals 6 equalslgnoreCase 7 getBytes 8 indexOf 9 lastlndexOf 10 length
  • 控制理论个人学习笔记-非线性系统理论

    文章目录 非线性系统理论 非线性系统的一般概念 相平面基础 非线性系统的相平面分析 描述函数法基础 非线性系统的描述函数法分析 非线性系统理论 非线性系统的一般概念 典型非线性 死区 饱和 间隙 摩擦 继电特性 继电特性使得系统产生振荡 死
  • 利用Java访问WEB Service

    最近在学习Web Service 发现了一个国内的Web Service提供站点 其中最简单的是查询QQ在线状态服务 我通过Java直接发送SOAP请求文件访问Web Service成功 这种方式实现比较简单 不需要第三方的软件包 impo
  • STEP_7计数器相关

    计数器的使用
  • 阿里云服务器租用费用清单表(CPU内存带宽磁盘)

    阿里云服务器租用费用包括CPU内存 公网带宽和系统盘三部分 云服务器购买可以选择活动机型也可以选择自定义购买 活动机型配置固定选择不自由 自定义购买配置自由选择但是费用贵的一批 阿里云百科来详细说下云服务器1核2G 2核4G 4核8G 8核
  • VMware vSphere 6.7先睹为快

    vSphere是老朋友了 还用再多介绍吗 最新的好消息是 VMware vSphere推出了最新版本6 7 相较两年前推出的VMware vSphere 6 5版本 新增了很多强大的功能 作为业内领先的虚拟化和云平台 vSphere的一举一
  • nginx root&alias文件路径配置

    nginx指定文件路径有两种方式root和alias 这两者的用法区别 使用方法总结了下 方便大家在应用过程中 快速响应 root与alias主要区别在于nginx如何解释location后面的uri 这会使两者分别以不同的方式将请求映射到
  • 第4章 用GPT-2生成文本

    BERT 是基于双向 Transformer 结构构建 而 GPT 2 是基于单向 Transformer 这里的双向与单向 是指在进行注意力计算时 BERT会同时考虑被遮蔽词左右的词对其的影响 融合了双向上下文信息 它比较适合于文本生成类
  • IO流介绍和异常处理

    IO流 1 1IO的分类 根据数据的流向分为 输入流和输出流 输入流 把数据从其他设备上读取到内存中的流 输出流 把数据从内存中写到其他设备上的流 根据功能类型分为 字节流和字符流 字节流 以字节为单位 读写数据的流 字符流 以字符为单位
  • tomcat无法启动,也没找到错误日志

    最近做项目的时候 遇到一个问题 项目启动不了 并且没有任何错误日志 1 bug描述 在做项目的时候 启动Tomcat时报错 2 bug信息 Connected to server 2017 11 16 09 28 36 551 Artifa
  • Python:用tkinter制做一个音乐下载小软件

    人生苦短 我用Python 平常我们下载的歌曲 都是各种妖魔鬼怪的格式横行 想下载下来用一下都不行 还只能在它的播放器内听 这谁受得了 学Python是用来干嘛的 当然是解决问题咯 于是我直接写了一手音乐下载软件 强制全部保存mp3 这样就
  • netty服务端的代码

    client code 客户端的ChannelHandler集合 由子类实现 这样做的好处 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers 获取到handlers之后方便ChannelPipelin