JAVA实现心跳机制

您所在的位置:网站首页 socket为什么要发心跳包 JAVA实现心跳机制

JAVA实现心跳机制

2024-07-16 11:06| 来源: 网络整理| 查看: 265

1.心跳机制简介

    在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。

    发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。

    一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。

2.心跳机制的实现方式

方式一:心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。

方式二:应用层自己实现

1.Client使用定时器,不断发送心跳; 2.Server收到心跳后,回复一个包; 3.Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。

3.java实现心跳机制:这里基于Java实现的简单RPC框架实现心跳机制

1.结构图

动态代理RPCClient

package com.heart.demo.entity; import org.springframework.cglib.proxy.InvocationHandler; import org.springframework.cglib.proxy.Proxy; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.Socket; /** * 动态代理 * @param */ public class RPCClient { public static T getRemoteProxyObj(final Class serviceInterface, final InetSocketAddress addr) { // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try { // 2.创建Socket客户端,根据指定地址连接远程服务提供者 socket = new Socket(); socket.connect(addr); // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者 output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 4.同步阻塞等待服务器返回应答,获取应答后返回 input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } finally { if (socket != null) socket.close(); if (output != null) output.close(); if (input != null) input.close(); } } }); } } 服务器接受心跳包返回的命令对象类 package com.heart.demo.entity; import java.io.Serializable; import java.util.HashMap; import java.util.Map; /** * 服务器接受心跳包返回的命令对象类: */ public class Cmder implements Serializable { private String nodeID; private String error; private Map info = new HashMap(); public String getNodeID() { return nodeID; } public void setNodeID(String nodeID) { this.nodeID = nodeID; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Map getInfo() { return info; } public void setInfo(Map info) { this.info = info; } } 心跳包实体类 package com.heart.demo.entity; import java.io.Serializable; import java.util.HashMap; import java.util.Map; /** * 心跳包实体类 */ public class HeartbeatEntity implements Serializable { private long time; private String nodeID; private String error; private Map info = new HashMap(); public String getNodeID() { return nodeID; } public void setNodeID(String nodeID) { this.nodeID = nodeID; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Map getInfo() { return info; } public void setInfo(Map info) { this.info = info; } public long getTime() { return time; } public void setTime(long time) { this.time = time; } } 心跳监听保存信息 package com.heart.demo.listener; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** * 心跳监听保存信息 * * @author cang * @create_time 2016-09-28 11:40 */ public class HeartbeatLinstener { private ExecutorService executor = Executors.newFixedThreadPool(20); private final ConcurrentHashMap nodes = new ConcurrentHashMap(); private final ConcurrentHashMap nodeStatus = new ConcurrentHashMap(); private long timeout = 10 * 1000; // 服务器监听端口 private int port = 8089; // 单例模式 private static class SingleHolder { private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener(); } private HeartbeatLinstener() { } public static HeartbeatLinstener getInstance() { return SingleHolder.INSTANCE; } public ConcurrentHashMap getNodes() { return nodes; } public void registerNode(String nodeId, Object nodeInfo) { nodes.put(nodeId, nodeInfo); nodeStatus.put(nodeId, System.currentTimeMillis()); } public void removeNode(String nodeID) { if (nodes.containsKey(nodeID)) { nodes.remove(nodeID); } } // 检测节点是否有效 public boolean checkNodeValid(String key) { if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false; if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false; return true; } // 删除所有失效节点 public void removeInValidNode() { Iterator it = nodeStatus.entrySet().iterator(); while (it.hasNext()) { Map.Entry e = it.next(); if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) { nodes.remove(e.getKey()); } } } }

处理器

package com.heart.demo.handler; import com.heart.demo.entity.Cmder; import com.heart.demo.entity.HeartbeatEntity; public interface HeartbeatHandler { public Cmder sendHeartBeat(HeartbeatEntity info); } package com.heart.demo.handler.impl; import com.heart.demo.entity.Cmder; import com.heart.demo.entity.HeartbeatEntity; import com.heart.demo.handler.HeartbeatHandler; import com.heart.demo.listener.HeartbeatLinstener; import java.util.Map; public class HeartbeatHandlerImpl implements HeartbeatHandler { public Cmder sendHeartBeat(HeartbeatEntity info) { HeartbeatLinstener linstener = HeartbeatLinstener.getInstance(); // 添加节点 if (!linstener.checkNodeValid(info.getNodeID())) { linstener.registerNode(info.getNodeID(), info); } // 其他操作 Cmder cmder = new Cmder(); cmder.setNodeID(info.getNodeID()); // ... System.out.println("current all the nodes: "); Map nodes = linstener.getNodes(); for (Map.Entry e : nodes.entrySet()) { System.out.println(e.getKey() + " : " + e.getValue()); } System.out.println("hadle a heartbeat"); return cmder; } }

客户端

package com.heart.demo.client; import com.heart.demo.entity.Cmder; import com.heart.demo.entity.HeartbeatEntity; import com.heart.demo.entity.RPCClient; import com.heart.demo.handler.HeartbeatHandler; import java.net.InetSocketAddress; import java.util.UUID; /** * 心跳客户端 */ public class HeartbeatClient implements Runnable { private String serverIP = "127.0.0.1"; private int serverPort = 8089; private String nodeID = UUID.randomUUID().toString(); private boolean isRunning = true; //最近的心跳时间 private long lastHeartbeat; //心跳间隔时间 10s private long heartBeatInterval = 10*1000; @Override public void run() { try { while(isRunning){ HeartbeatHandler handler = RPCClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort)); long startTime = System.currentTimeMillis(); //是否达到发送心跳的周期时间 if(startTime - lastHeartbeat > heartBeatInterval){ System.out.println("send a heart beat"); lastHeartbeat = startTime; HeartbeatEntity heartbeatEntity = new HeartbeatEntity(); heartbeatEntity.setTime(startTime); heartbeatEntity.setNodeID(nodeID); //向服务器发送心跳,并返回需要执行的命令 Cmder cmder = handler.sendHeartBeat(heartbeatEntity); if(!processCommand(cmder)){ continue; } } } } catch (Exception e) { e.printStackTrace(); } } /** * 通过指令 * @param cmder * @return */ private boolean processCommand(Cmder cmder) { System.out.println("nodeId"+cmder.getNodeID()); return true; } }

服务注册

package com.heart.demo.server; import com.heart.demo.listener.HeartbeatLinstener; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** * 服务注册中心 */ public class ServiceCenter { private ExecutorService executor = Executors.newFixedThreadPool(20); private final ConcurrentHashMap serviceRegistry = new ConcurrentHashMap(); private AtomicBoolean isRunning = new AtomicBoolean(true); // 服务器监听端口 private int port = 8089; // 心跳监听器 HeartbeatLinstener linstener; // 单例模式 private static class SingleHolder { private static final ServiceCenter INSTANCE = new ServiceCenter(); } private ServiceCenter() { } public static ServiceCenter getInstance() { return SingleHolder.INSTANCE; } public void register(Class serviceInterface, Class impl) { System.out.println("regeist service " + serviceInterface.getName()); serviceRegistry.put(serviceInterface.getName(), impl); } public void start() throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(port)); System.out.println("start server"); linstener = HeartbeatLinstener.getInstance(); System.out.println("start listen heart beat"); try { while (true) { // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行 executor.execute(new ServiceTask(server.accept())); } } finally { server.close(); } } public void stop() { isRunning.set(false); executor.shutdown(); } public boolean isRunning() { return isRunning.get(); } public int getPort() { return port; } public void settPort(int port) { this.port = port; } public ConcurrentHashMap getServiceRegistry() { return serviceRegistry; } private class ServiceTask implements Runnable { Socket clent = null; public ServiceTask(Socket client) { this.clent = client; } public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果 input = new ObjectInputStream(clent.getInputStream()); String serviceName = input.readUTF(); String methodName = input.readUTF(); Class[] parameterTypes = (Class[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Class serviceClass = serviceRegistry.get(serviceName); if (serviceClass == null) { throw new ClassNotFoundException(serviceName + " not found"); } Method method = serviceClass.getMethod(methodName, parameterTypes); Object result = method.invoke(serviceClass.newInstance(), arguments); // 3.将执行结果反序列化,通过socket发送给客户端 output = new ObjectOutputStream(clent.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if (output != null) { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } if (input != null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } if (clent != null) { try { clent.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }

测试

package com.heart.demo; import com.heart.demo.client.HeartbeatClient; import com.heart.demo.handler.HeartbeatHandler; import com.heart.demo.handler.impl.HeartbeatHandlerImpl; import com.heart.demo.server.ServiceCenter; import java.io.IOException; public class HeartbeatTest { public static void main(String[] args) { new Thread(new Runnable() { public void run() { try { ServiceCenter serviceServer = ServiceCenter.getInstance(); serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class); serviceServer.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); Thread client1 = new Thread(new HeartbeatClient()); client1.start(); Thread client2 = new Thread(new HeartbeatClient()); client2.start(); } }

 转载:https://www.cnblogs.com/codingexperience/p/5939059.html



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3