Idea+maven+spring

您所在的位置:网站首页 maven层级 Idea+maven+spring

Idea+maven+spring

2023-03-14 12:50| 来源: 网络整理| 查看: 265

前言: dubbo 作为rpc 通信组件,在使用过程中,如何避免服务提供端被多个消费端撑爆,消费端如何避免因为服务端的故障造成结果响应超时。

1 服务提供端的限流措施:

1.1 使用 :dubbo.protocol.accepts 参数限制服务端同时链接消费端的连接数 在这里插入图片描述 0 表示不限制连接数;

1.2 使用: dubbo.provider.executes 限制每个方法并行处理的最大可并行执行请求: 在这里插入图片描述 也可以在类级别进行定义从而覆盖全局的配置:@DubboService(executes = 2)

2 消费端提供的熔断措施:

2.1 服务端在代理类中定义mock 的返回值:

@DubboReference(mock = "return null") private DubboThreeService threeService; @RequestMapping(value = "/dubbo-three", method = RequestMethod.GET) public String indexTestThree() { Object obj =threeService.testOne("123"); return String.valueOf(obj); }

当服务端异常时 方法直接返回null 值;

测试类DubboThreeService:

public interface DubboThreeService { String testOne(String token); }

2.2 服务端定义异常抛出类:

@DubboReference(mock = "throw org.lgx.bluegrass.api.common.BizException") private DubboThreeService threeService;

异常类:BizException

@Data public class BizException extends RuntimeException { private static final long serialVersionUID = 1L; protected int errorCode ; protected String errorMessage; private Map errorMap; public BizException() { super(); this.errorCode =500_001; this.errorMessage = "dubbo exception"; } }

2.3 对于单个代理类进行处理: 1) 定义mock 的类:

@DubboReference(mock = "org.lgx.bluegrass.api.service.impl.DubboTestServiceImpl") private DubboTestService dubboTestService;

测试类:DubboTestService

public interface DubboTestService { // 暴露dubbo 服务 String test(String token); }

用本服务中的实现类代替进行结果返回:

public class DubboTestServiceImpl implements DubboTestService { @Override public String test(String token) { return "hello !"; } }

2) 在DubboTestService 接口类同层级包中定义类 DubboThreeServiceMock mock 类名字规则:代理的接口类名+Mock

public class DubboThreeServiceMock implements DubboThreeService { @Override public String testOne(String token) { return "mock hello"; } }

代理类中mock 就可以省去默认实现的类路径:

@DubboReference(mock = "true") private DubboThreeService threeService;

3 限流的实现:

请求并发限制:dubbo.provider.executes:ExecuteLimitFilter

// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.dubbo.rpc.filter; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcStatus; import org.apache.dubbo.rpc.Filter.Listener; @Activate( group = {"provider"}, value = {"executes"} ) public class ExecuteLimitFilter implements Filter, Listener { private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time"; public ExecuteLimitFilter() { } public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { // 服务路径 URL url = invoker.getUrl(); // 调用的方法名 String methodName = invocation.getMethodName(); // 获取方法定义的最大并发量如果没有设置则 会取integer的最大值 int max = url.getMethodParameter(methodName, "executes", 0); // 判断当前方法正在并发执行的请求数量,如果超过最大数量则直接抛出异常 if (!RpcStatus.beginCount(url, methodName, max)) { throw new RpcException(7, "Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than limited."); } else { invocation.put("execute_limit_filter_start_time", System.currentTimeMillis()); try { return invoker.invoke(invocation); } catch (Throwable var7) { if (var7 instanceof RuntimeException) { throw (RuntimeException)var7; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", var7); } } } } public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) { // 方法调用完成,并发数量-1 RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), this.getElapsed(invocation), true); } public void onError(Throwable t, Invoker invoker, Invocation invocation) { if (t instanceof RpcException) { RpcException rpcException = (RpcException)t; if (rpcException.isLimitExceed()) { return; } } // 方法调用完成,并发数量-1 RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), this.getElapsed(invocation), false); } private long getElapsed(Invocation invocation) { Object beginTime = invocation.get("execute_limit_filter_start_time"); return beginTime != null ? System.currentTimeMillis() - (Long)beginTime : 0L; } }

RpcStatus.beginCount 判断方法:

private static final ConcurrentMap SERVICE_STATISTICS = new ConcurrentHashMap(); private static final ConcurrentMap METHOD_STATISTICS = new ConcurrentHashMap(); public static boolean beginCount(URL url, String methodName, int max) { max = max int i; do { // 获取此时并行执行的方法数量,大于定义的则返回false i = methodStatus.active.get(); if (i + 1 > max) { return false; } } while(!methodStatus.active.compareAndSet(i, i + 1)); appStatus.active.incrementAndGet(); return true; } } public static RpcStatus getStatus(URL url) { String uri = url.toIdentityString(); // 应用级别记录的 RpcStatus return (RpcStatus)SERVICE_STATISTICS.computeIfAbsent(uri, (key) -> { return new RpcStatus(); }); } public static void removeStatus(URL url) { String uri = url.toIdentityString(); SERVICE_STATISTICS.remove(uri); } public static RpcStatus getStatus(URL url, String methodName) { String uri = url.toIdentityString(); ConcurrentMap map = (ConcurrentMap)METHOD_STATISTICS.computeIfAbsent(uri, (k) -> { return new ConcurrentHashMap(); }); // 方法级别记录的RpcStatus return (RpcStatus)map.computeIfAbsent(methodName, (k) -> { return new RpcStatus(); });

4 熔断的实现: 熔断主要通过mock 数据模拟进行处理:MockClusterInvoker:

public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 获取mock 的值 String value = this.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim(); // 如果没有定义mock ,或者定义为false 则直接调用远程的方法 if (value.length() != 0 && !"false".equalsIgnoreCase(value)) { // 如果定义强制走本服务的方法实现,则直接走本服务 if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.getUrl()); } result = this.doMockInvoke(invocation, (RpcException)null); } else { try { // 不强制走本地服务,则发起远程调用 result = this.invoker.invoke(invocation); if (result.getException() != null && result.getException() instanceof RpcException) { RpcException rpcException = (RpcException)result.getException(); if (rpcException.isBiz()) { throw rpcException; } // 如果远程调用失败,则发起本地服务调用 result = this.doMockInvoke(invocation, rpcException); } } catch (RpcException var5) { // 方法调用异常 if (var5.isBiz()) { throw var5; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.getUrl(), var5); } // 发起本地服务调用 result = this.doMockInvoke(invocation, var5); } } } else { // 远程方法的调用 result = this.invoker.invoke(invocation); } return result; }

MockInvoker 本地服务方法实现:

public Result invoke(Invocation invocation) throws RpcException { if (invocation instanceof RpcInvocation) { ((RpcInvocation)invocation).setInvoker(this); } // 获取mock 的定义 String mock = null; if (this.getUrl().hasMethodParameter(invocation.getMethodName())) { mock = this.getUrl().getParameter(invocation.getMethodName() + "." + "mock"); } if (StringUtils.isBlank(mock)) { mock = this.getUrl().getParameter("mock"); } if (StringUtils.isBlank(mock)) { throw new RpcException(new IllegalAccessException("mock can not be null. url :" + this.url)); } else { mock = normalizeMock(URL.decode(mock)); // 如果定义了 return 则取return 的值进行返回 if (mock.startsWith("return ")) { mock = mock.substring("return ".length()).trim(); try { Type[] returnTypes = RpcUtils.getReturnTypes(invocation); Object value = parseMockValue(mock, returnTypes); return AsyncRpcResult.newDefaultAsyncResult(value, invocation); } catch (Exception var5) { throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + this.url, var5); } } else if (mock.startsWith("throw")) { // 定义直接抛出异常 mock = mock.substring("throw".length()).trim(); if (StringUtils.isBlank(mock)) { throw new RpcException("mocked exception for service degradation."); } else { Throwable t = getThrowable(mock); throw new RpcException(3, t); } } else { try { // 走本地服务的调用 Invoker invoker = this.getInvoker(mock); return invoker.invoke(invocation); } catch (Throwable var6) { throw new RpcException("Failed to create mock implementation class " + mock, var6); } } } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3