Route、Predicate与Filter的关系

您所在的位置:网站首页 route-policy和filter-policy的区别 Route、Predicate与Filter的关系

Route、Predicate与Filter的关系

2024-07-10 09:52| 来源: 网络整理| 查看: 265

Spring-Cloud-Gateway源码系列学习

版本 v2.2.6.RELEASE

Route、Predicate、Filter关系总览

Route、Predicate、Filter是Spring-Cloud-Gateway里面非常核心的三个概念,其中Route是Spring-Cloud-Gateway处理的基本单位,Route里面是包含Predicat和Filter的,都是一对多的关系。Filter分为GatewayFilter和GlobalFilter,其中GlobalFilter会作用于每个Route。以一个请求来简单描述它们的作用时机就是:

img

Route源码解析 public class Route implements Ordered { private final String id; //路由目标uri,最终被转发的目的地 private final URI uri; //序号,越小优先级越高,实现Ordered的getOrder方法返回order private final int order; //谓语,匹配Route的前置条件 private final AsyncPredicate predicate; //过滤器列表,比如可以修改请求头等 private final List gatewayFilters; //元信息,有两个值可以设置分别是response-timeout和connect-timeout //@see org.springframework.cloud.gateway.support.RouteMetadataUtils private final Map metadata; } Route与Predicate关系之Predicate是如何被装载到Route的

以RouteDefinitionRouteLocator#combinePredicates分析

private AsyncPredicate combinePredicates( RouteDefinition routeDefinition) { //获取Map裸的谓词配置信息 List predicates = routeDefinition.getPredicates(); if (predicates == null || predicates.isEmpty()) { // this is a very rare case, but possible, just match all return AsyncPredicate.from(exchange -> true); } //生产出第一个AsyncPredicate的谓词 AsyncPredicate predicate = lookup(routeDefinition, predicates.get(0)); for (PredicateDefinition andPredicate : predicates.subList(1, predicates.size())) { //生产出下一个谓词 AsyncPredicate found = lookup(routeDefinition, andPredicate); //使用AsyncPredicate#and讲两个谓词合并为一个 predicate = predicate.and(found); } //将所有谓词合并为一个后返回 return predicate; } private AsyncPredicate lookup(RouteDefinition route, PredicateDefinition predicate) { //获取对应的谓词工厂 RoutePredicateFactory factory = this.predicates.get(predicate.getName()); if (factory == null) { throw new IllegalArgumentException( "Unable to find RoutePredicateFactory with name " + predicate.getName()); } if (logger.isDebugEnabled()) { logger.debug("RouteDefinition " + route.getId() + " applying " + predicate.getArgs() + " to " + predicate.getName()); } //构造谓词必要信息传给断言工厂 // @formatter:off Object config = this.configurationService.with(factory) .name(predicate.getName()) .properties(predicate.getArgs()) .eventFunction((bound, properties) -> new PredicateArgsEvent( RouteDefinitionRouteLocator.this, route.getId(), properties)) .bind(); // @formatter:on //生产一个AsyncPredicate return factory.applyAsync(config); } Route与Filter关系之Filter是如何被装载到Route的

以RouteDefinitionRouteLocator#getFilters分析

private List getFilters(RouteDefinition routeDefinition) { List filters = new ArrayList(); // TODO: support option to apply defaults after route specific filters? //如果配置信息定义了defaultFilters,需要都加到filters里面 if (!this.gatewayProperties.getDefaultFilters().isEmpty()) { filters.addAll(loadGatewayFilters(DEFAULT_FILTERS, new ArrayList(this.gatewayProperties.getDefaultFilters()))); } if (!routeDefinition.getFilters().isEmpty()) { filters.addAll(loadGatewayFilters(routeDefinition.getId(), new ArrayList(routeDefinition.getFilters()))); } AnnotationAwareOrderComparator.sort(filters); return filters; } List loadGatewayFilters(String id, List filterDefinitions) { //定义一个已经设置order的GatewayFilter列表 ArrayList ordered = new ArrayList(filterDefinitions.size()); for (int i = 0; i new FilterArgsEvent( // TODO: why explicit cast needed or java compile fails RouteDefinitionRouteLocator.this, id, (Map) properties)) .bind(); // @formatter:on // some filters require routeId // TODO: is there a better place to apply this? if (configuration instanceof HasRouteId) { HasRouteId hasRouteId = (HasRouteId) configuration; //如果含有routeId则设置进去 hasRouteId.setRouteId(id); } //生产出GatewayFilter GatewayFilter gatewayFilter = factory.apply(configuration); //如果该GatewayFilter本身就实现Order,就直接添加到ordered列表 if (gatewayFilter instanceof Ordered) { ordered.add(gatewayFilter); } else { //否则转成OrderedGatewayFilter,order遍历的下标+1 ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1)); } } return ordered; } Route与Predicate关系之Predicate是如何帮助HandlerMapping匹配到Route的

以RoutePredicateHandlerMapping#lookupRoute分析

protected Mono lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() // individually filter routes so that filterWhen error delaying is not a // problem //使用filterWhen进行过滤,返回Mono会留下 .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing //作者测试用途 exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); //有了前面的合并AsyncPredicate,匹配Route只需要非常简单地apply,就直接把整个AsyncPredicate表达式都执行了 return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error( "Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.empty().log("noroute")) //取一个 .next() // TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); //返回匹配的Route return route; }); /* * TODO: trace logging if (logger.isTraceEnabled()) { * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); } */ } Route与Filter关系之Filter是如何完成Route的愿望的

以NettyRoutingFilter#filter和NettyWriteResponseFilter#filter分析,其中NettyRoutingFilter是发送请求到Route的目标url,处理响应信息的status及header部分;NettyWriteResponseFilter负责处理响应信息的body部分,并发回给客户端。

public class NettyRoutingFilter implements GlobalFilter, Ordered { @Override @SuppressWarnings("Duplicates") public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { //拿出uri URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); //获取协议,如 https String scheme = requestUrl.getScheme(); //如果已经处理了,或者 不是 http 也 不是 https if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { //下一个filter /** * @see ForwardRoutingFilter#filter(org.springframework.web.server.ServerWebExchange, org.springframework.cloud.gateway.filter.GatewayFilterChain) */ return chain.filter(exchange); } //设置为已经处理了 setAlreadyRouted(exchange); //获取 request ServerHttpRequest request = exchange.getRequest(); //获取请求方法, 如POST final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); //获得ascii编码的url final String url = requestUrl.toASCIIString(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); //获取route Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); //创建请求流 Flux responseFlux = getHttpClient(route, exchange) .headers(headers -> { headers.add(httpHeaders); // Will either be set below, or later by Netty headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } }).request(method).uri(url).send((req, nettyOutbound) -> { //类似netty outbound, 可以在这里做编码工作 if (log.isTraceEnabled()) { nettyOutbound .withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { /** * 需要在这里做保存、处理header、status * * @see HttpClientResponse */ // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter //保存res到exchange上下文,流到NettyWriteResponseFilter的.then把响应结果发回客户端 //把connection放到CLIENT_RESPONSE_ATTR /** * @see NettyWriteResponseFilter#filter(org.springframework.web.server.ServerWebExchange, org.springframework.cloud.gateway.filter.GatewayFilterChain) */ exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach( entry -> headers.add(entry.getKey(), entry.getValue())); //获取数据类型,并保存到ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } //把请求目标地址返回的status设置到exchange.getResponse setResponseStatus(res, response); // make sure headers filters run after setting status so it is // available in response //确保Response的HttpHeadersFilter在请求响应后执行 HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); return Mono.just(res); }); Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { //发起请求 responseFlux = responseFlux //设置响应超时时间 .timeout(responseTimeout, Mono.error(new TimeoutException( "Response took longer than timeout: " + responseTimeout))) //传播一个超时异常 .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); } /** * 把DataBuffer转成ByteBuf,用于netty请求目标地址 * @param dataBuffer * @return */ protected ByteBuf getByteBuf(DataBuffer dataBuffer) { if (dataBuffer instanceof NettyDataBuffer) { NettyDataBuffer buffer = (NettyDataBuffer) dataBuffer; return buffer.getNativeBuffer(); } // MockServerHttpResponse creates these else if (dataBuffer instanceof DefaultDataBuffer) { DefaultDataBuffer buffer = (DefaultDataBuffer) dataBuffer; return Unpooled.wrappedBuffer(buffer.getNativeBuffer()); } throw new IllegalArgumentException( "Unable to handle DataBuffer of type " + dataBuffer.getClass()); } /** * 把请求目标地址返回的status设置到exchange.getResponse * @param clientResponse * @param response */ private void setResponseStatus(HttpClientResponse clientResponse, ServerHttpResponse response) { HttpStatus status = HttpStatus.resolve(clientResponse.status().code()); if (status != null) { response.setStatusCode(status); } else { while (response instanceof ServerHttpResponseDecorator) { response = ((ServerHttpResponseDecorator) response).getDelegate(); } if (response instanceof AbstractServerHttpResponse) { ((AbstractServerHttpResponse) response) .setStatusCodeValue(clientResponse.status().code()); } else { // TODO: log warning here, not throw error? throw new IllegalStateException("Unable to set status code " + clientResponse.status().code() + " on response of type " + response.getClass().getName()); } } } /** * Creates a new HttpClient with per route timeout configuration. Sub-classes that * override, should call super.getHttpClient() if they want to honor the per route * timeout configuration. * * 创建一个基于Netty的HttpClient * * @param route the current route. * @param exchange the current ServerWebExchange. * @param chain the current GatewayFilterChain. * @return */ protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) { Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR); if (connectTimeoutAttr != null) { Integer connectTimeout = getInteger(connectTimeoutAttr); return this.httpClient.tcpConfiguration((tcpClient) -> tcpClient .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)); } return httpClient; } /** * 根据route的Metadata的CONNECT_TIMEOUT_ATTR获取连接超时时间 * @param connectTimeoutAttr * @return */ static Integer getInteger(Object connectTimeoutAttr) { Integer connectTimeout; if (connectTimeoutAttr instanceof Integer) { connectTimeout = (Integer) connectTimeoutAttr; } else { connectTimeout = Integer.parseInt(connectTimeoutAttr.toString()); } return connectTimeout; } /** * 根据route获取响应超时时间 * @param route * @return */ private Duration getResponseTimeout(Route route) { Object responseTimeoutAttr = route.getMetadata().get(RESPONSE_TIMEOUT_ATTR); Long responseTimeout = null; if (responseTimeoutAttr != null) { if (responseTimeoutAttr instanceof Number) { responseTimeout = ((Number) responseTimeoutAttr).longValue(); } else { responseTimeout = Long.valueOf(responseTimeoutAttr.toString()); } } return responseTimeout != null ? Duration.ofMillis(responseTimeout) : properties.getResponseTimeout(); } } public class NettyWriteResponseFilter implements GlobalFilter, Ordered { @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added // until the NettyRoutingFilter is run // @formatter:off /** * @see ForwardPathFilter#filter(org.springframework.web.server.ServerWebExchange, org.springframework.cloud.gateway.filter.GatewayFilterChain) */ return chain.filter(exchange) .doOnError(throwable -> cleanup(exchange)) .then(Mono.defer(() -> { /** * 从 exchange 中拿到 connection * @see NettyRoutingFilter#filter(org.springframework.web.server.ServerWebExchange, org.springframework.cloud.gateway.filter.GatewayFilterChain) */ Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); if (connection == null) { //如果connection为null,返回一个空的流 return Mono.empty(); } if (log.isTraceEnabled()) { log.trace("NettyWriteResponseFilter start inbound: " + connection.channel().id().asShortText() + ", outbound: " + exchange.getLogPrefix()); } ServerHttpResponse response = exchange.getResponse(); // TODO: needed? //创建body final Flux body = connection .inbound() .receive() .retain() .map(byteBuf -> wrap(byteBuf, response)); //获取内容格式 MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { if (log.isTraceEnabled()) { log.trace("invalid media type", e); } } //根据内容格式是不是流,采用不同处理 return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })).doOnCancel(() -> cleanup(exchange)); // @formatter:on } //ByteBuf 转成 DataBuffer protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) { DataBufferFactory bufferFactory = response.bufferFactory(); if (bufferFactory instanceof NettyDataBufferFactory) { NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory; return factory.wrap(byteBuf); } // MockServerHttpResponse creates these else if (bufferFactory instanceof DefaultDataBufferFactory) { DataBuffer buffer = ((DefaultDataBufferFactory) bufferFactory) .allocateBuffer(byteBuf.readableBytes()); buffer.write(byteBuf.nioBuffer()); byteBuf.release(); return buffer; } throw new IllegalArgumentException( "Unkown DataBufferFactory type " + bufferFactory.getClass()); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3