Live555 分析(一):类介绍

您所在的位置:网站首页 delay可数嘛 Live555 分析(一):类介绍

Live555 分析(一):类介绍

2023-02-28 22:28| 来源: 网络整理| 查看: 265

  从程序的结构来看,live项目包括了四个基本库、程序入口类(在mediaServer中)和一些测试代码(在testProgs中)。

  四个基本静态库是UsageEnvironment、BasicUsageEnvironment、groupsock和liveMedia。

UsageEnvironment:

  包括抽象类UsageEnvironment和抽象类TaskScheduler,这两个类用于事件调度,其中包括实现了对事件的异步读取、对事件句柄的设置及对错误信息的输出等;该库中还有一个HashTable,这是一个通用的HashTable,在整个项目中都可以使用它,当然该HashTable也是一个抽象类。

BasicUsageEnvironment:

  主要是对UsageEnvironment中对应类的实现。 

  BasicUsageEnvironment和UsageEnvironment中的类都是用于整个系统的基础功能类.比如UsageEnvironment代表了整个系统运行的环境,它提供了错误记录和错误报告的功能,无论哪一个类要输出错误,就需要保存UsageEnvironment的指针.而TaskScheduler则提供了任务调度功能.整个程序的运行发动机就是它,它调度任务,执行任务(任务就是一个函数).TaskScheduler由于在全局中只有一个,所以保存在了UsageEnvironment中.而所有的类又都保存了UsageEnvironment的指针,所以谁想把自己的任务加入调度中,那是很容易的.在此还看到一个结论:整个live555(服务端)只有一个线程.

  BasicUsageEnvironment和UsageEnvironment中的类初始化:

// Begin by setting up our usage environment: TaskScheduler* scheduler = BasicTaskScheduler::createNew(); env = BasicUsageEnvironment::createNew(*scheduler); 

  在服务端中的main()函数,可见其创建一个RTSPServer类实例后,和在客服端的main函数连接成功,开始播放后,即进入相同的一个函数:

env->taskScheduler().doEventLoop(); // does not return

  在函数定义在BasicTaskScheduler0.cpp中:

void BasicTaskScheduler0::doEventLoop(char* watchVariable) { // Repeatedly loop, handling readble sockets and timed events: while (1) { if (watchVariable != NULL && *watchVariable != 0) break; SingleStep(); } }

  BasicTaskScheduler0从TaskScheduler派生,所以还是一个任务调度对象,所以依然说明任务调度对象是整个程序的发动机。循环中每次走一步:SingleStep():

void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) { fd_set readSet = fReadSet; // make a copy for this select() call //计算select socket们时的超时时间。 DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm(); struct timeval tv_timeToDelay; tv_timeToDelay.tv_sec = timeToDelay.seconds(); tv_timeToDelay.tv_usec = timeToDelay.useconds(); // Very large "tv_sec" values cause select() to fail. // Don't make it any larger than 1 million seconds (11.5 days) const long MAX_TV_SEC = MILLION; if (tv_timeToDelay.tv_sec > MAX_TV_SEC) { tv_timeToDelay.tv_sec = MAX_TV_SEC; } // Also check our "maxDelayTime" parameter (if it's > 0): if (maxDelayTime > 0 && (tv_timeToDelay.tv_sec > (long)maxDelayTime/MILLION || (tv_timeToDelay.tv_sec == (long)maxDelayTime/MILLION && tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) { tv_timeToDelay.tv_sec = maxDelayTime/MILLION; tv_timeToDelay.tv_usec = maxDelayTime%MILLION; } //先执行socket的select操作,以确定哪些socket任务(handler)需要执行。 int selectResult = select(fMaxNumSockets, &readSet, NULL, NULL, &tv_timeToDelay); if (selectResult < 0) { #if defined(__WIN32__) || defined(_WIN32) int err = WSAGetLastError(); // For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if // it was called with no entries set in "readSet". If this happens, ignore it: if (err == WSAEINVAL && readSet.fd_count == 0) { err = 0; // To stop this from happening again, create a dummy readable socket: int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0); FD_SET((unsigned)dummySocketNum, &fReadSet); } if (err != 0) { #else if (errno != EINTR && errno != EAGAIN) { #endif // Unexpected error - treat this as fatal: #if !defined(_WIN32_WCE) perror("BasicTaskScheduler::SingleStep(): select() fails"); #endif exit(0); } } // Handle any delayed event that may have come due: //执行一个最迫切的延迟任务。 fDelayQueue.handleAlarm(); // Call the handler function for one readable socket: HandlerIterator iter(*fReadHandlers); HandlerDescriptor* handler; // To ensure forward progress through the handlers, begin past the last // socket number that we handled: if (fLastHandledSocketNum >= 0) { //找到上次执行的socket handler的下一个 while ((handler = iter.next()) != NULL) { if (handler->socketNum == fLastHandledSocketNum) break; } if (handler == NULL) { fLastHandledSocketNum = -1; iter.reset(); // start from the beginning instead } } //从找到的handler开始,找一个可以执行的handler,不论其状态是可读,可写,还是出错,执行之。 while ((handler = iter.next()) != NULL) { if (FD_ISSET(handler->socketNum, &readSet) && FD_ISSET(handler->socketNum, &fReadSet) /* sanity check */ && handler->handlerProc != NULL) { fLastHandledSocketNum = handler->socketNum; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. (*handler->handlerProc)(handler->clientData, SOCKET_READABLE); break; } } //如果寻找完了依然没有执行任何handle,则从头再找。 if (handler == NULL && fLastHandledSocketNum >= 0) { // We didn't call a handler, but we didn't get to check all of them, // so try again from the beginning: iter.reset(); while ((handler = iter.next()) != NULL) { if (FD_ISSET(handler->socketNum, &readSet) && FD_ISSET(handler->socketNum, &fReadSet) /* sanity check */ && handler->handlerProc != NULL) { fLastHandledSocketNum = handler->socketNum; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. (*handler->handlerProc)(handler->clientData, SOCKET_READABLE); break; } } //依然没有找到可执行的handler。 if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler } }

总结为以下四步:   1> 为所有需要操作的socket执行select。  2> 找到第一个应执行的延迟任务并执行之。  3> 找出第一个应执行的socket任务(handler)并执行之。

  可见,每一步中只执行三个任务队列中的一项。到这里,我们不尽要问这些socket handler和delay task是哪里来的?

  DelayQueue类:译为"延迟队列",它是一个队列,每一项代表了一个要调度的任务(在它的fToken变量中保存)。同时保存了这个任务离执行时间点的剩余时间。可以预见,它就是在TaskScheduler中用于管理调度任务的东西。注意:此队列中的任务只被执行一次!执行完后这一项即被无情抛弃!

  HandlerSet类:Handler集合。Handler是什么呢?它是一种专门用于执行socket操作的任务(函数),HandlerSet被TaskScheduler用来管理所有的socket任务(增删改查)。所以TaskScheduler中现在已调度两种任务了:socket任务(handlerSet)和延迟任务(DelayQueue).其实TaskScheduler还调度第三种任务:Event,介个后面再说。

  HandlerDescriptor类:socket handlet描述。

  socket handler加入执行队列后会一直存在,而delay task在执行完一次后会立即弃掉。

  socket handler保存在队列BasicTaskScheduler0::HandlerSet* fHandlers中,通过调用envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this)这样的方式加入到socket handler队列中:

void BasicTaskScheduler::turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void* clientData) { if (socketNum < 0) return; FD_SET((unsigned)socketNum, &fReadSet); fReadHandlers->assignHandler(socketNum, handlerProc, clientData); if (socketNum+1 > fMaxNumSockets) { fMaxNumSockets = socketNum+1; } }

  socket handler添加时为什么需要那些参数呢?socketNum是需要的,因为要select socket(socketNum即是socket()返回的那个socket对象)。conditionSet也是需要的,它用于表明socket在select时查看哪种装态,是可读?可写?还是出错?proc和clientData这两个参数就不必说了(真有不明白的吗?)。再看BackgroundHandlerProc的参数,socketNum不必解释,mask是什么呢?它正是对应着conditionSet,但它表明的是select之后的结果,比如一个socket可能需要检查其读/写状态,而当前只能读,不能写,那么mask中就只有表明读的位被设置。

void HandlerSet::assignHandler(int socketNum, TaskScheduler::BackgroundHandlerProc* handlerProc, void* clientData) { // First, see if there's already a handler for this socket: HandlerDescriptor* handler; HandlerIterator iter(*this); while ((handler = iter.next()) != NULL) { if (handler->socketNum == socketNum) break; } if (handler == NULL) { // No existing handler, so create a new descr: handler = new HandlerDescriptor(fHandlers.fNextHandler); handler->socketNum = socketNum; } handler->handlerProc = handlerProc; handler->clientData = clientData; }

  delay task保存在队列BasicTaskScheduler0::DelayQueue fDelayQueue中,通过调用envir().taskScheduler().scheduleDelayedTask(0,  (TaskFunc*)FramedSource::afterGetting, this)这样的方式加入到delay task队列中,需要传入task延迟等待的微秒(百万分之一秒)数(第一个参数):

TaskToken BasicTaskScheduler0::scheduleDelayedTask(int64_t microseconds,TaskFunc* proc, void* clientData) {   if (microseconds < 0)     microseconds = 0;   //DelayInterval 是表示时间差的结构   DelayInterval timeToDelay((long) (microseconds / 1000000),(long) (microseconds % 1000000));   //创建delayQueue中的一项   AlarmHandler* alarmHandler = new AlarmHandler(proc, clientData,timeToDelay);   //加入DelayQueue   fDelayQueue.addEntry(alarmHandler);   //返回delay task的唯一标志   return (void*) (alarmHandler->token()); }

  delay task的执行都在函数fDelayQueue.handleAlarm()中,handleAlarm()在类DelayQueue中实现。看一下handleAlarm():

void DelayQueue::handleAlarm() {   //如果第一个任务的执行时间未到,则同步一下(重新计算各任务的等待时间)。   if (head()->fDeltaTimeRemaining != DELAY_ZERO)     synchronize();   //如果第一个任务的执行时间到了,则执行第一个,并把它从队列中删掉。   if (head()->fDeltaTimeRemaining == DELAY_ZERO) {     // This event is due to be handled:     DelayQueueEntry* toRemove = head();     removeEntry(toRemove); // do this first, in case handler accesses queue     //执行任务,执行完后会把这一项销毁。     toRemove->handleTimeout();   } }

  可能感觉奇怪,其它的任务队列都是先搜索第一个应该执行的项,然后再执行,这里干脆,直接执行第一个完事。那就说明第一个就是最应该执行的一个吧?也就是等待时间最短的一个吧?那么应该在添加任务时,将新任务跟据其等待时间插入到适当的位置而不是追加到尾巴上吧?猜得对不对还得看fDelayQueue.addEntry(alarmHandler)这个函数是怎么执行的。

void DelayQueue::addEntry(DelayQueueEntry* newEntry) {   //重新计算各项的等待时间   synchronize();   //取得第一项   DelayQueueEntry* cur = head();   //从头至尾循环中将新项与各项的等待时间进行比较   while (newEntry->fDeltaTimeRemaining >= cur->fDeltaTimeRemaining) {     //如果新项等待时间长于当前项的等待时间,则减掉当前项的等待时间。     //也就是后面的等待时几只是与前面项等待时间的差,这样省掉了记录插入时的时间的变量。     newEntry->fDeltaTimeRemaining -= cur->fDeltaTimeRemaining;     //下一项     cur = cur->fNext;   }   //循环完毕,cur就是找到的应插它前面的项,那就插它前面吧   cur->fDeltaTimeRemaining -= newEntry->fDeltaTimeRemaining;   // Add "newEntry" to the queue, just before "cur":   newEntry->fNext = cur;   newEntry->fPrev = cur->fPrev;   cur->fPrev = newEntry->fPrev->fNext = newEntry; }

groupsock:

  groupsock库中包括了GroupEId、Groupsock、GroupsockHelper、NetAddress、NetInterface等类。

  这个是放在单独的库Groupsock中。它封装了socket操作,支持udp多播放支持和一对多单播的功能,tcp socket创建。Groupsock类有两个构造函数:

  一个是ASM(即任意源组播模型):

// Constructor for a source-independent multicast group Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr, Port port, u_int8_t ttl) :OutputSocket(env, port),//创建udp socket deleteIfNoMembers(False), isSlave(False), fIncomingGroupEId(groupAddr, port.num(), ttl), fDests(NULL), fTTL(ttl) { addDestination(groupAddr, port);//记录组播地址 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {//如果非组播地址,则不会进入里面,组播地址,则加入组播 if (DebugLevel >= 1) { env = 0) { // this is a fatal error env = 2) env = 3) { env = 1) { env = 2) env fStreamSocketNum(注意:tcp单播:服务端的socket是绑定的,每次客服端请求便会创建一个clientsocket, 而客服端rtsp rtp rtcp协议用就是用的这个socket;udp单播,live555没有采用connect的方式,直接setDestinations函数加入到fDests链表中,发送时候的目标地址在此链表中),当rtp和rtcp发送数据的时候调用的RTPInterface::sendPacket函数将会用到:

void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) { // Normal case: Send as a UDP packet: fGS->output(envir(), fGS->ttl(), packet, packetSize);//udp传输 // Also, send over each of our TCP sockets: for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {//tcp传输 sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId); } }

  setDestinations()函数定义在MediaSubsession.cpp中:

void MediaSubsession::setDestinations(unsigned defaultDestAddress) { // Get the destination address from the connection endpoint name // (This will be 0 if it's not known, in which case we use the default) unsigned destAddress = connectionEndpointAddress(); if (destAddress == 0) destAddress = defaultDestAddress; struct in_addr destAddr; destAddr.s_addr = destAddress; // The destination TTL remains unchanged: int destTTL = ~0; // means: don't change if (fRTPSocket != NULL) { Port destPort(serverPortNum);//这里serverPortNum是在服务端应答中获取rtp的端口号 fRTPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//这里的fRTPSocket是在前面的initiate函数中创建的 } if (fRTCPSocket != NULL && !isSSM()) { // Note: For SSM sessions, the dest address for RTCP was already set. Port destPort(serverPortNum+1);//这个是rtcp的端口号 fRTCPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//这里的fRTCPSocket在前面的initiate函数创建的 } } changeDestinationParameters()定义在Groupsock.cpp中: //改变目的地址的参数 //newDestAddr是新的目的地址 //newDestPort是新的目的端口 //newDestTTL是新的TTL void Groupsock::changeDestinationParameters( struct in_addr const& newDestAddr, Port newDestPort, int newDestTTL) { if (fDests == NULL) return; //获取第一个目的地址(此处不是很明白:fDest是一个单向链表,每次添加一个目的地址, //都会把它插入到最前目,难道这个函数仅改变最后一个添加的目的地址?) struct in_addr destAddr = fDests->fGroupEId.groupAddress(); if (newDestAddr.s_addr != 0) { if (newDestAddr.s_addr != destAddr.s_addr && IsMulticastAddress(newDestAddr.s_addr)) { //如果目的地址是一个多播地址,则离开老的多播组,加入新的多播组。 socketLeaveGroup(env(), socketNum(), destAddr.s_addr); socketJoinGroup(env(), socketNum(), newDestAddr.s_addr); } destAddr.s_addr = newDestAddr.s_addr; } portNumBits destPortNum = fDests->fGroupEId.portNum(); if (newDestPort.num() != 0) { if (newDestPort.num() != destPortNum && IsMulticastAddress(destAddr.s_addr)) { //如果端口也不一样,则先更改本身socket的端口 //(其实是关掉原先的socket的,再以新端口打开一个socket)。 changePort(newDestPort); //然后把新的socket加入到新的多播组。 // And rejoin the multicast group: socketJoinGroup(env(), socketNum(), destAddr.s_addr); } destPortNum = newDestPort.num(); fDests->fPort = newDestPort; } u_int8_t destTTL = ttl(); if (newDestTTL != ~0) destTTL = (u_int8_t) newDestTTL; //目标地址的所有信息都在fGroupEId中,所以改变成员fGroupEId。 fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL); //(看起来这个函数好像只用于改变多播时的地址参数, //以上分析是否合理,肯请高人指点) }

liveMedia:

  是很重要的一个库,其不仅包含了实现RTSP Server的类,还包含了针对不同流媒体类型(如TS流、PS流等)编码的类。在该库中,基类是Medium,层次关系非常清晰。在该库中,有几个很重要的类,如RTSPServer、ServerMediaSession、RTPSink、RTPInterface、FramedSource等。  

  从上面这个主要的类结构可以看出,liveMedia库中的基类为Medium,其下又有几个非常重要的部分,一个是×××Subsession,除Medium父类外,所有的×××Subsession类都继承于ServerMediaSubsession;一个是×××Source,MediaSource的frameSource下主要包含FramedFileSource、RTPSource、FramedFilter等几个主要的部分;一个是MediaSink,以继承于RTPSink的类居多。

  此外,还包含了用于处理packet的BufferedPacketFactory和BufferedPacket及其相关子类,用于处理流分析的StreamParser及其子类。

公用类:

  RTPInterface类:发送rtp rtcp数据包;startNetworkReading函数注册tcp读取rtp rtcp数据的回调函数,注册udp读取rtp rtcp数据的回调函数。

  RTCPInstance类:rtcp协议的实现,创建RTPInterface类实例fRTCPInterface。

服务端实现需要以下几个基类:  

  RTSPServer:创建rtsp分tcp socket,注册任务incomingConnectionHandler,在此任务函数里,accept接收客服端连接,创建RTSPClientSession类;

  RTSPClientSession类:注册任务incomingRequestHandler,在此任务函数里,readSocket读取客服端发送的信息,并且解析出cmdName,收到"OPTIONS"命令则通过handleCmd_OPTIONS函数应答,收到"DESCRIBE"则通过handleCmd_DESCRIBE函数应答,收到 "SETUP"命令则通过handleCmd_SETUP函数应答,收到 "TEARDOWN"、 "PLAY"、"PAUSE"、"GET_PARAMETER"通过handleCmd_withinSession函数应答,如果都不是则通过handleCmd_notSupported函数应答。  

    ServerMediaSubsession类:多媒体流可能包含几个子流,每个流都带有fTrackId,比如视频流track1,音频流track2

  ServerMediaSession类:一个多媒体流增加addSubsession子流,产生generateSDPDescription SDP描述。

      FramedSource类:提供虚函数doGetNextFrame函数去获取信息,此函数具体实现在派生类中,比如我的是在OpenFileSource类实现。

  RTPSink类:服务端数据是从 FramedSource流到 RTPSink,并且创建RTPInterface类实例fRTPInterface。

单播:

  StreamState类:startPlaying函数开始播放,endPlaying函数结束播放,pause函数暂停播放。

  Destinations类:udp连接,保存isTCP为false,目标地址,rtp的端口,rtcp的端口;tcp连接,保存isTcp为TRUE,tcp socket值,rtp rtcp通道ID。

  OnDemandServerMediaSubsession类:单播的时候,通过getStreamParameters函数创建udp socket,创建FramedSource, 创建RTPSink,创建StreamState,创建Destinations,当收到“PLAY”的命令时调用startStream开始播放,当收到“PAUSE”时调用pauseStream函数暂停播放,当收到“TERADOWN”时停止播放。 

组播:

  PassiveServerMediaSubsession类:getStreamParameters函数改变rtp和rtcp的目标地址。

客服端实现需要以下几个基类:

  RTPSource:客户端数据则是从 RTPSource 流到 XXXFileSink, 并且创建RTPInterface类实例fRTPInterface。

  MediaSubsession类:initiate函数创建rtp rtcp socket,并且根据服务端传来的编码类型fCodecName来创建相应的RTPSource,创建RTCPInstance;解析SDP的各种属性;setDestinations函数设置服务端地址。

  RTSPClient: 创建tcp socket,连接rtsp服务端,sendOptionsCmd函数发送“OPTIONS”命令; describeURL函数则是发送“DESCRIBE”命令;announceSDPDescription函数则是发送“ANNOUNCE”命令;setupMediaSubsession函数则是发送“SETUP”命令,并且设置socket的目标地址;playMediaSession函数则是发送“PLAY”命令;pauseMediaSession函数则是发送“PAUSE”命令;等等。主要是RTSP的通信处理。

  基本上,整个liveMedia库的主要类结构就是这样。

  在http://www.live555.com上的相关文档中提到穿透防火墙的问题,方法是开启一个HTTP的tunnel,然后我们可以在liveMedia库中找到一个RTSPOverHTTPServer的类,该类解决了这样的问题。

mediaServer:

   Live555MediaServer提供了main函数,DynamicRTSPServer继承了RTSPServer并重写了虚函数lookupServerMediaSession。用不到。 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3