浅读 Libco

Posted by Masutangu on October 5, 2016

今天花了一天时间,学习了下微信的开源协程库 libco的代码,写下来做个纪录,有部分细节代码(包括 coctx_swap.S 那段汇编)我还没读懂,以后再补充进来。

协程的原理

协程的概念和优点这里不再赘述。我们先介绍下实现协程的原理,再来看看相应的代码。

协程的切换,其实就是由我们手动来管理指令执行的上下文。一般每一个协程有自己的 context_buff 来保存自己的运行上下文(寄存器和栈)。当需要挂起当前协程时,将当前的上下文保存到该协程的 context_buff,并把当前上下文重置为新的协程的 context_buff 即可。

hook 系统调用

一般会在有 IO 阻塞操作的时候做协程的切换。如何让协程的使用者无需关心这些切换的细节呢?libco 采用 hooking IO 函数的方法。将 IO 设置为非阻塞,提交给 epoll 来管理,并让出 cpu 资源,等到 epoll 事件返回后再 resume 对应的协程。下面的代码会给出相应的例子。

libco 源码

  • 协程相关数据结构

    stCoRoutineEnv_t 管理协程的结构体,每起一个新的协程就压入 pCallStack 中,每挂起一个协程就将其踢出 pCallStack。

    1
    2
    3
    4
    5
    6
    
      struct stCoRoutineEnv_t
      {
          stCoRoutine_t *pCallStack[ 128 ];
          int iCallStackSize;
          stCoEpoll_t *pEpoll;
      };
    

    stCoRoutine_t 封装了协程对象,coctx_t 保存协程的 context。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
      struct stCoRoutine_t
      {
          stCoRoutineEnv_t *env;
          pfn_co_routine_t pfn;
          void *arg;
          coctx_t ctx;
          char cStart;
          char cEnd;
          stCoSpec_t aSpec[1024];
          char cIsMain;
          char cEnableSysHook;
          char sRunStack[ 1024 * 128 ];
      };
    
  • epoll 相关数据结构

    管理 epoll 的结构体,pTimeout 管理 Timeout 事件。pstTimeoutList 为超时事件的列表。pstTimeoutList 和 pstActiveList 下面会看到其用法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
      struct stCoEpoll_t
      {
          int iEpollFd;
          static const int _EPOLL_SIZE = 1024 * 10;
          struct stTimeout_t *pTimeout;
          struct stTimeoutItemLink_t *pstTimeoutList;  
          struct stTimeoutItemLink_t *pstActiveList;
          co_epoll_res *result; 
    
      };
    

    stTimeout_t 封装了 Timeout 结构体。pItems 是一个链表数组,具体用法可以查看 int AddTimeout( stTimeout_t *apTimeout,stTimeoutItem_t *apItem ,unsigned long long allNow ) 函数。

    1
    2
    3
    4
    5
    6
    7
    
      struct stTimeout_t
      {
          stTimeoutItemLink_t *pItems;
          int iItemSize;
          unsigned long long ullStart;
          long long llStartIdx;
      };
    
  • hooking IO

    下面以 read 函数为例子,看看 libco 是如何将 io 操作通过 epoll 来管理并与协程结合起来。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    
      ssize_t read( int fd, void *buf, size_t nbyte )
      {
          HOOK_SYS_FUNC( read );
          if( !co_is_enable_sys_hook() )  // 如果没开启hook,则返回系统的 read 函数
          {
              return g_sys_read_func( fd,buf,nbyte );
          }
          rpchook_t *lp = get_by_fd( fd );
    
          if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 
          {
              ssize_t ret = g_sys_read_func( fd,buf,nbyte );
              return ret;
          }
          int timeout = ( lp->read_timeout.tv_sec * 1000 ) 
                      + ( lp->read_timeout.tv_usec / 1000 );
    
          struct pollfd pf = { 0 };
          pf.fd = fd;
          pf.events = ( POLLIN | POLLERR | POLLHUP );
    
          int pollret = poll( &pf,1,timeout );  // 这里调用 poll 了,注意 poll 会挂起当前协程让出cpu,下面的指令需要等到该协程 resume 才会继续执行了。
    
          ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
    		
          return readret;
    		
      }
    
      int poll(struct pollfd fds[], nfds_t nfds, int timeout)
      {
    
          HOOK_SYS_FUNC( poll );
    
          if( !co_is_enable_sys_hook() )
          {
              return g_sys_poll_func( fds,nfds,timeout );
          }
    
          return co_poll( co_get_epoll_ct(),fds,nfds,timeout );  // 调用的是 co_poll
      }
    
  • epoll 逻辑

    最主要的代码都在 co_poll 函数和 co_eventloop 函数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    
      int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout )
      {
          int epfd = ctx->iEpollFd;
    
          //1.struct change
          stPoll_t arg;
          memset( &arg,0,sizeof(arg) );
    
          arg.iEpollFd = epfd;
          arg.fds = fds;
          arg.nfds = nfds;
    
          stPollItem_t arr[2];
          if( nfds < sizeof(arr) / sizeof(arr[0]) )
          {
              arg.pPollItems = arr;
          }	
          else
          {
              arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
          }
    
          memset( arg.pPollItems, 0, nfds * sizeof(stPollItem_t) );
    
          arg.pfnProcess = OnPollProcessEvent;  // 当 epoll 事件被触发,就会调用该函数来 resume 相应的协程。
          arg.pArg = GetCurrCo( co_get_curr_thread_env() );  // pArg 保存当前的协程,pfnProcess 函数中用该字段来得到需要 resume 的协程对象。
    		
          //2.add timeout
    
          unsigned long long now = GetTickMS();
          arg.ullExpireTime = now + timeout;
          int ret = AddTimeout( ctx->pTimeout,&arg,now );  // 调用 AddTimeout,由 stCoEpoll_t 管理超时。
    		
          //3. add epoll
    
          for(nfds_t i = 0; i < nfds; i++)
          {
              arg.pPollItems[i].pSelf = fds + i;
              arg.pPollItems[i].pPoll = &arg;
    
              arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
              struct epoll_event &ev = arg.pPollItems[i].stEvent;
    
              if( fds[i].fd > -1 )
              {
                  ev.data.ptr = arg.pPollItems + i;
                  ev.events = PollEvent2Epoll( fds[i].events );
    
                  co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );  // 添加到 epoll 中监听
              }
              //if fail,the timeout would work
    			
          }
    
          co_yield_env( co_get_curr_thread_env() );  // 让出 cpu,挂起当前协程了。等到 stCoEpoll_t resume 该协程再继续执行下面的指令了
    
          // 下面都是清理工作 可以不用细看
          RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &arg );
          for(nfds_t i = 0;i < nfds;i++)
          {
              int fd = fds[i].fd;
              if( fd > -1 )
              {
                  co_epoll_ctl( epfd,EPOLL_CTL_DEL,fd,&arg.pPollItems[i].stEvent );
              }
          }
    
    
          if( arg.pPollItems != arr )
          {
              free( arg.pPollItems );
              arg.pPollItems = NULL;
          }
          return arg.iRaiseCnt;
      }
    
      // 事件循环
      void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
      {
          if( !ctx->result )
          {
              ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
          }
          co_epoll_res *result = ctx->result;
    
          for(;;)
          {
              int ret = co_epoll_wait( ctx->iEpollFd, result, stCoEpoll_t::_EPOLL_SIZE, 1 );
    
              stTimeoutItemLink_t *active = (ctx->pstActiveList);
              stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
    
              memset( timeout,0,sizeof(stTimeoutItemLink_t) );
    
              for(int i=0;i<ret;i++)
              {
                  stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
                  if( item->pfnPrepare )
                  {
                      item->pfnPrepare( item,result->events[i],active );
                  }
                  else
                  {
                      AddTail( active,item );  // 监听到的事件放到 active 链表里
                  }
              }
    
    
              unsigned long long now = GetTickMS();
              TakeAllTimeout( ctx->pTimeout,now,timeout );  // 超时的事件放到 timeout 链表里
    
              stTimeoutItem_t *lp = timeout->head;
              while( lp )
              {
                  lp->bTimeout = true;
                  lp = lp->pNext;
              }
    
              Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );  // 合并 active 和 timeout 链表
    
              lp = active->head;
              while( lp )
              {
                  PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
                  if( lp->pfnProcess )
                  {
                      lp->pfnProcess( lp );  // 一个个拿出来处理,调用 pfnProcess 函数,即 OnPollProcessEvent 函数
                  }
                  lp = active->head;
              }
              if( pfn )
              {
                  if( -1 == pfn( arg ) )
                  {
                      break;
                  }
              }
          }
      }
    
      void OnPollProcessEvent( stTimeoutItem_t * ap )
      {
          stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;  // 从上面知道,pArg 保存了该事件对应的协程
          co_resume( co );   // resume 相应的协程
      }
    
  • 协程的挂起和恢复

    协程的挂起和恢复由 stCoRoutineEnv_t 的 pCallStack 来管理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
      void co_yield_env( stCoRoutineEnv_t *env )  // 挂起当前协程,恢复其父协程
      {
          stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];  // last 可以认为是父协程
          stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];  // curr 为当前执行的协程
          env->iCallStackSize--;
          coctx_swap( &curr->ctx, &last->ctx );  // 保持 curr 协程的上下文, 并恢复 last 协程的上下文
      }
    
      void co_resume( stCoRoutine_t *co )   // 恢复 co 协程
      {
          stCoRoutineEnv_t *env = co->env;
          stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
          if( !co->cStart )
          {
              coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
              co->cStart = 1;
          }
          env->pCallStack[ env->iCallStackSize++ ] = co;  // 执行协程的时候压入 pCallStack 栈中
          coctx_swap( &(lpCurrRoutine->ctx),&(co->ctx) );  // 恢复 co 协程的上下文
      }
    

流程图

简化版的流程如下图所示: