Thrift |
您所在的位置:网站首页 › thrifty记忆 › Thrift |
Getting Started
如果有homebrew的话,直接执行以下命令即可,brew会处理相关依赖(https://thrift.apache.org/docs/install/)。 brew install thrift
或者可以从源码安装。 下载tar包 https://thrift.apache.org/download 参考 https://thrift.apache.org/docs/BuildingFromSource
先写一个例子,目录结构如下: ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ └── java └── thrift ├── Common.thrift └── ShopService.thrift
pom.xml中添加以下依赖: org.apache.thrift libthrift 0.10.0 org.projectlombok lombok 1.16.18 provided org.slf4j slf4j-simple 1.7.25
thrift目录下创建两个thrift文件: Common.thrift namespace java me.kavlez.thrift.service service BaseService { string echoServiceName() }
ShopService.thrift include "Common.thrift" namespace java me.kavlez.thrift.service struct Shop { 1: required i32 id, 2: required string name } struct Item { 1: required i32 id, 2: required string name = "unknown", 3: required string detail, 4: required Shop shop } service ShopService extends Common.BaseService { Shop queryShopInfo(1: i32 id), bool isValidShop(1: Shop shop), set queryItems(1: i32 shopId), }
Thrift提供了多个语言的生成器实现,按照thrift文件生成java类,生成代码命令的用法如下: thrift -r --gen
其中-r即recursive,如果在文件中通过include关键字引用了其他文件,-r选项可以一并生成被引用的文件。 例如上面ShopService.thrift中的: include Common.thrift
默认情况下,代码会在gen-目录下生成,生成目录可以通过--out指定。 生成后再拷贝有点麻烦,直接生成到代码目录下,在工程目录下执行以下命令: thrift -r --gen java --out src/main/java thrift/ShopService.thrift
执行后src/main/java/目录下生成me/kavlez/thrift/service/目录,以及4个java文件。
在service目录下创建impl,提供接口实现: package me.kavlez.thrift.service.impl; import lombok.extern.slf4j.Slf4j; import me.kavlez.thrift.service.Item; import me.kavlez.thrift.service.Shop; import me.kavlez.thrift.service.ShopService; import org.apache.thrift.TException; import java.util.Collections; import java.util.HashSet; import java.util.Set; /** * Created by [email protected] */ @Slf4j public class ShopServiceImpl implements ShopService.Iface { @Override public Shop queryShopInfo(int id) throws TException { return new Shop(id, "DMC_".concat(String.valueOf(id))); } @Override public boolean isValidShop(Shop shop) throws TException { return shop != null; } @Override public Set queryItems(int shopId) throws TException { if (shopId < 1) { return Collections.emptySet(); } Set items = new HashSet(); Shop shop = new Shop(1101, "DMC"); for (int i = 0; i < 8; i++) { Item item = new Item(shopId + i, "sample_".concat(String.valueOf(shopId + i)) , "this is sample_".concat(String.valueOf(i)) , shop); items.add(item); } return items; } @Override public String echoServiceName() throws TException { return "alo! this is shop service!"; } }
除了业务实现,我们需要额外做两件事情——构建Server和Client。 构建Server,也就是为Server指定Transparent、Protocol、Processor: package me.kavlez.thrift.server; import lombok.extern.slf4j.Slf4j; import me.kavlez.thrift.service.ShopService; import me.kavlez.thrift.service.impl.ShopServiceImpl; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; /** * Created by [email protected] */ @Slf4j public class SimpleServerHolder { public static TServer buildServer() { TServerSocket serverSocket = null; try { serverSocket = new TServerSocket(8081); } catch (TTransportException e) { e.printStackTrace(); } TProcessor tprocessor = new ShopService.Processor(new ShopServiceImpl()); TServer.Args tArgs = new TServer.Args(serverSocket); tArgs.protocolFactory(new TCompactProtocol.Factory()); tArgs.processor(tprocessor); TServer server = new TSimpleServer(tArgs); return server; } public static void main(String[] args) { TServer server = SimpleServerHolder.buildServer(); log.info("server ready..."); server.serve(); } }
相应地,构建Client: package me.kavlez.thrift.client; import lombok.extern.slf4j.Slf4j; import me.kavlez.thrift.service.Item; import me.kavlez.thrift.service.ShopService; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import java.util.Set; /** * Created by [email protected] */ @Slf4j public class SimpleClientHolder { private TTransport transport; public ShopService.Client buildClient(String serverAddr, int serverPort, int timeout) throws TException { this.transport = new TSocket(serverAddr, serverPort, timeout); TProtocol protocol = new TCompactProtocol(transport); transport.open(); ShopService.Client client = new ShopService.Client(protocol); return client; } public static void main(String[] args) { SimpleClientHolder simpleClientHolder = new SimpleClientHolder(); ShopService.Client client = null; try { client = simpleClientHolder.buildClient("localhost", 8081, 1000); Set items = client.queryItems(666); log.info("return items = {}", String.valueOf(items)); } catch (TException e) { e.printStackTrace(); } if (null != simpleClientHolder.transport) { simpleClientHolder.transport.close(); } } }依次运行Server和Client,输出正常。 IDL (Interface Description Language) 提供服务的第一步是用IDL编写Thrift文件,IDL几乎可以描述接口所需的所有元素,接口定义中包括以下内容: namespace 每个thrift文件都在自己的命名空间中,多个thrift文件可以用同一个命名空间作为标识,并指定要使用的语言的generator。 例如: namespace java me.kavlez.thrift.service namespace php tutorial基本类型 类型说明 bool 布尔类型 i8 (byte) 8-bit 有符号整型,对应java的byte i16 16-bit 有符号整型,对应java的short i32 32-bit 有符号整型,对应java的int i64 64-bit 有符号整型,对应java的long double 64-bit 浮点类型,对应java的double string 字符串 binary Blob (byte array) 结构体 用于定义一个对象类型。 字段默认为optional,可以声明required。 字段可以设置默认值。 结构体之间可以互相引用。 0.9.2开始可以引用自身。 struct Shop { 1: required i32 id, 2: required string name } struct Item { 1: required i32 id, 2: required string name = "unknown", 3: required string detail, 4: required Shop shop }枚举 值是可选项,枚举不能嵌套;基本上就是K、V的形式,不能描述太复杂的枚举类。 enum Numberz { ONE = 1, TWO, THREE, FIVE = 5, SIX, EIGHT = 8 }常量 可以自定义常量,像Map、List这样的复杂结构可以用json表示。 const i32 INT_CONST = 1234; // a const map MAP_CONST = {"hello": "world", "goodnight": "moon"} const list LIST_CONST = ["a","b","c"]容器类型 不支持异构容器,容器的元素类型必须一致。 元素类型可以是service以外的任何类型。 类型说明 map Map from one type to another list Ordered list of one type set Set of unique elements of one type自定义异常 语法上和struct相似,生成后的代码,不同语言各有各的实现方式。 exception IllegalShopException { 1: i32 errorCode, 2: string message, 3: Shop shop }service 一个函数集合,语法和java定义接口的语法类似,下面是一些例子。 service ThriftTest { /** * 无返回,空参数列表 */ void testVoid(), /** * 声明返回类型、参数 */ string testString(1: string thing), /** * 返回结构体 */ Shop queryShopInfo(1: i32 id), /** * 结构体作为参数 */ bool isValidShop(1: Shop shop), /** * ... */ set queryItems(1: i32 shopId), /** * 抛出异常 */ bool changeShopStatus(1: i32 shopId) throws(1: IllegalShopException err), /** * 多异常 */ bool changeItemStatus(1: i32 itemId) throws(1: IllegalShopException shopErr,2:IllegalItemException itemErr), /** * oneway表示该方法在客户端发起请求后不会等待响应,返回类型必须为void */ oneway void sendMessage(1:i32 shopId,2:string message) }thrift working stack 用Thrift构建服务和客户端,架构如下: +-------------------+ +-------------------+ | Server | | Client | | | | | | +---------------+ | | +---------------+ | | | | | | | | | | | your code | | | | your code | | | +---------------+ | | +---------------+ | | | Service | | | | Service | | | | processor | | | | Client | | | +---------------+ | | +---------------+ | | | | | | | | | | | Protocol | | | | Protocol | | | +---------------+ | | +---------------+ | | | | | | | | | | | Transport || Transport | | | +---------------+ | | +---------------+ | +-------------------+ +-------------------+
生成的接口类中大致包括三样,分别是Iface、Client、Processor。 另外还有Server、Transport、Protocol。 Transport 在RPC框架的语境下谈传输层很容易只想到网络通信,但Transport表述的并不只是网络通信。 不如说Transport是多种IO的抽象,其不仅限于网络IO。 比如,基础的TIOStreamTransport,以及其两个子类,TSocket和TZlibTransport。 TSocket在上面的例子中作为TBinaryProtocol依赖的transport类型,与Server的TServerSocket进行通信。 但后者是封装了InflaterInputStream和DeflaterOutputStream,其InputStream并不要求是SocketInputStream。 从开发角度来讲,如果将一个TMemoryBuffer对象传入Protocol,并以此创建某个service对应的Client,再调用相应接口。 整个过程在代码上并没有什么限制,只是运行时抛出org.apache.thrift.TApplicationException。
Protocol protocol依赖transport,决定双方以什么协议通信,同时也是通信内容的载体。 org.apache.thrift.protocol.TProtocol中的方法声明里,一系列readXX和writeXX,在具体实现中通常都是通过transport来完成。 以TJSONProtocol为例,其实现的TProtocol的所有write方法都是以几个私有的write方法组织起来。 比如,writeI32和writeI64都是通过私有方法writeJSONInteger,而writeJSONInteger则是由实例化时传入的trasnport进行write。
Processor 构建自己的server时需要在tArgs提供一个Processor,比如本文中的ShopService.Processor。 (p.s. 如果需要提供多个Processor,比如再加一个ItemService,则使用TMultiplexedProcessor即可。) Server通过Processor执行业务逻辑代码,文件中描述的每个函数作为ProcessFunction子类进行实例化,放入Processor的processMap中。 Server收到请求,从输入的protocol中读取方法名,根据方法名从processMap中拿到对应的ProcessFunction; 通过ProcessFunction的process方法执行业务逻辑,过程大体分为3步: 从protocol读入请求参数,构建参数对象; 传入参数,本地执行业务方法。假设方法名为"getItems",调用结果则为getItems_success; 将结果写入protocol,调用protocol.writeXX; Client像本文中,指定Transport和Protocol,构建ShopService.Client,客户端通过Client对象像调用本地方法一样调用queryItems; 在ShopService中,Client类同样实现了ShopService.Iface中的方法,以queryItems为例,其实现如下: public Shop queryShopInfo(int id) throws org.apache.thrift.TException { send_queryShopInfo(id); return recv_queryShopInfo(); }
在send_queryShopInfo,构建该函数对应的xx_args对象,将其写入oprot,并通过oprot.tranport进行flush; 相应地,recv_queryShopInfo就是从iport中读取函数的返回值,构建该函数对应的queryShopInfo_result对象。 Server 将Transport、Protocol和Processor集合在一起就是一个完整的Server,父类TServer提供了唯一的抽象方法——serve()。 以TSimpleServer为例,serve中通过java.lang.ServerSocket的accept获取client Socket并转为client Transport,以此获取相应的Processor、创建相应的inputTransport、outputTransport和iProt、oProt。 (p.s. 默认的TProcessorFactory没有子类,其getProcessor(Transport)和并没有通过transport来获取processor。可以用来扩展,比如用一个server提供多版本服务之类的。) 剩下的工作由Processor进行处理,从iPort读入请求信息并构造TMessage,找到相应的ProcessFunction并执行其process方法,这个在上面说过。 Thrift为TServer提供了3种实现: TSimpleServer: 单线程ServerSocket实现,仅用于测试; TThreadPoolServer: 封装了ThreadPoolExecutor,用内部类WorkerProcess表示单个请求,通过每个WorkerProcess对象的transport获取相应的Processor和Protocol,调用业务代码并返回; AbstractNonblockingServer: 非阻塞server抽象类,其serve()方法即整个过程的skeleton,serve()中调用的方法交给其子类提供具体实现。 public void serve() { // start any IO threads if (!startThreads()) { return; } // start listening, or exit if (!startListening()) { return; } setServing(true); // this will block while we serve waitForShutdown(); setServing(false); // do a little cleanup stopListening(); }
AbstractNonblockingServer的3个子类,分别为: TNonblockingServer: 实现父类的startThreads(),启动selector线程(也就是SelectAcceptThread,父类声明了protected final Selector selector),开始轮询SelectedKeys,检查状态并进行相应处理: if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { handleRead(key); } else if (key.isWritable()) { handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); }另外,使用TNonblockingServer时transport必须为TFramedTransport,以此保证能正确读取单次方法调用。 THsHaServer: "HsHa",即"Half-Sync/Half-Async",是TNonblockingServer的子类。 工作流程和TNonblockingServer相似,主要区别在与handleRead()。 handleRead中完成读取后,另外一项重要的工作就是requestInvoke(buffer),也就是执行processor.process(iProt,oProt)。 不过,TNonblockingServer是单线程执行,而THsHaServer则是通过线程池。 将FrameBuffer装进Invocation(其run方法即frameBuffer.invoke()),提交给线程池处理。 线程池参数的默认值如下: corePoolSize = 5; maximumPoolSize = Integer.MAX_VALUE; keepAliveTime = 60; workQueue = new LinkedBlockingQueue();TThreadedSelectorServer: 进一步加强HsHaServer,用一个AcceptThread接收所有连接请求,并担任负载均衡的角色。 负载均衡的工作由构造器参数中的SelectorThreadLoadBalancer进行,该类只提供了一种实现——对已注册的selector线程列表进行round robin。 AcceptThread处理连接时,通过SelectorThreadLoadBalancer选出selector线程,将接收到的socketChannel放入selector线程的队列中。 虽然TThreadedSelectorServer的requestInvoke也是使用线程池进行,但线程池的默认配置和THsHaServer不同,默认时为corePoolSize为5的FixedThreadPool。 如果corePoolSize小为0,则由caller线程执行。
最后,把之前的例子修改一下,看看效果。 AbstractTServerHolder.java package me.kavlez.thrift.server; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; public abstract class AbstractTServerHolder { private TServer tServer; public abstract TServer build() throws TTransportException; }ThreadedSelectorServerHolder.java package me.kavlez.thrift.server; import me.kavlez.thrift.service.ShopService; import me.kavlez.thrift.service.impl.ShopServiceImpl; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TTransportException; public class ThreadedSelectorServerHolder extends AbstractTServerHolder { @Override public TServer build() throws TTransportException { TNonblockingServerTransport transport = new TNonblockingServerSocket(8090); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport); ShopService.Processor shopServiceProcessor = new ShopService.Processor(new ShopServiceImpl()); args.processor(shopServiceProcessor) .protocolFactory(new TBinaryProtocol.Factory()) .transportFactory(new TFramedTransport.Factory()); TServer server = new TThreadedSelectorServer(args); return server; } }Launcher.java package me.kavlez.thrift; import lombok.extern.slf4j.Slf4j; import me.kavlez.thrift.client.AbstractShopServiceClientHolder; import me.kavlez.thrift.client.NonBlockingClientHolder; import me.kavlez.thrift.client.ShopServiceClientAgent; import me.kavlez.thrift.server.AbstractTServerHolder; import me.kavlez.thrift.server.ThreadedSelectorServerHolder; import me.kavlez.thrift.service.Item; import me.kavlez.thrift.service.ShopService; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import java.io.FileNotFoundException; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j public class Launcher { static class TServerClientHolderPair { private AbstractTServerHolder tServerHolder; private Class serverFuture = executorService.submit(new Runnable() { @Override public void run() { tServer.serve(); } }); Thread.sleep(100); int times = 10; final CountDownLatch countDownLatch = new CountDownLatch(times); class ShopServiceClientTask implements Runnable { @Override public void run() { AbstractShopServiceClientHolder clientHolder = null; clientHolder = new NonBlockingClientHolder(); try { ShopService.Iface shopService = new ShopServiceClientAgent(clientHolder.build()); for (int i = 0; i < 1000; i++) { Set items = shopService.queryItems(666); log.info("return items = {}", String.valueOf(items)); } } catch (TException e) { log.info("thread name={} get TException", Thread.currentThread().getName(), e); } finally { clientHolder.close(); countDownLatch.countDown(); } } } long start = System.currentTimeMillis(); for (int i = 0; i < times; i++) { executorService.submit(new ShopServiceClientTask()); } countDownLatch.await(); log.info("used {} ms ", System.currentTimeMillis() - start); tServer.setShouldStop(true); tServer.stop(); executorService.shutdown(); } } |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |