神刀安全网

muduo net library

TCP网络编程最本质是的处理三个半事件

  • 连接建立:服务器accept(被动)接受连接,客户端connect(主动)发起连接
  • 连接断开:主动断开(close、shutdown),被动断开(read返回0)
  • 消息到达:文件描述符可读
  • 消息发送完毕:这算半个。对于低流量的服务,可不必关心这个事件;这里的发送完毕是指数据写入操作系统缓冲区,将由TCP协议栈负责数据的发送与重传,不代表对方已经接收到数据。

EchoServer类图

muduo net library

什么都不做的EventLoop

  • one loop per thread

意思是说每个线程最多只能有一个EventLoop对象。

  • EventLoop对象构造的时候,会检查当前线程是否已经创建了其他EventLoop对象,如果已创建,终止程序(LOG

    FATAL)

    = EventLoop构造函数会记住本对象所属线程(threadId

    )。

  • 创建了EventLoop对象的线程称为IO线程,其功能是运行事件循环(EventLoop::loop)

EventLoop 头文件

EventLoop.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <boost/noncopyable.hpp>

#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>

namespace muduo
{
namespace net
{

///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
public:
EventLoop();
~EventLoop(); // force out-line dtor, for scoped_ptr members.

///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
void loop();

//断言是否是当前的线程

void assertInLoopThread()
{

//过是当前线程,直接跳过,否则调用abortNotInLoopThread
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
//测试是否为当前线程
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

static EventLoop* getEventLoopOfCurrentThread();

private:
void abortNotInLoopThread();

bool looping_; /* atomic , 是否处于事件循环*/
const pid_t threadId_; // 当前对象所属线程ID
};

}
}
#endif // MUDUO_NET_EVENTLOOP_H

EventLoop 源文件

EventLoop.cpp

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/EventLoop.h>

#include <muduo/base/Logging.h>

#include <poll.h>

using namespace muduo;
using namespace muduo::net;

namespace
{
// 当前线程EventLoop对象指针
// 线程局部存储 ,如果不用__thread修饰的话,那么就是线程共共享的了
//这样达不到目标,。。
//初始化时,设为0就OK了
__thread EventLoop* t_loopInThisThread = 0;
}

EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}

EventLoop::EventLoop()
: looping_(false), //初始化为false ,表示当前还没有处于事件循环状态
threadId_(CurrentThread::tid()) //当前的线程Id ,用于标识
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
// 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL),否者的话,设为当前线程this
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
}

EventLoop::~EventLoop()
{
t_loopInThisThread = NULL;
}

// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
//断言还没有事件循环
assert(!looping_);
// 断言当前处于创建该对象的线程中
assertInLoopThread();
//把事件循环标识设为true
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";
//关注事件为NULL,个数为0,延时5000
::poll(NULL, 0, 5*1000);

LOG_TRACE << "EventLoop " << this << " stop looping";
//事件循环标识设为false ,这只是测试程序
looping_ = false;
}

//终止程序
void EventLoop::abortNotInLoopThread()
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}

EventLoop 的测试程序1

#include <muduo/net/EventLoop.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

/*
该程序 主要是用来测试EventLoop 是否是 一个Thread一个EventLoop
**/

void threadFunc()
{

printf("threadFunc(): pid = %d, tid = %d/n",
getpid(), CurrentThread::tid());

EventLoop loop;
loop.loop();
}

int main(void)
{

printf("main(): pid = %d, tid = %d/n",
getpid(), CurrentThread::tid());

EventLoop loop;

Thread t(threadFunc);
t.start();

loop.loop();
t.join();
return 0;
}

程序输出

[root@localhost bin]# ./reactor_test01
main(): pid = 2724, tid = 2724
20131018 04:12:55.305234Z 2724 TRACE EventLoop EventLoop created 0xBFA6B2C0 in thread 2724 - EventLoop.cc:36
20131018 04:12:55.305599Z 2724 TRACE loop EventLoop 0xBFA6B2C0 start looping - EventLoop.cc:62
threadFunc(): pid = 2724, tid = 2725
20131018 04:12:55.305792Z 2725 TRACE EventLoop EventLoop created 0xB77E0068 in thread 2725 - EventLoop.cc:36
20131018 04:12:55.305809Z 2725 TRACE loop EventLoop 0xB77E0068 start looping - EventLoop.cc:62
20131018 04:13:00.321713Z 2724 TRACE loop EventLoop 0xBFA6B2C0 stop looping - EventLoop.cc:66
20131018 04:13:00.321779Z 2725 TRACE loop EventLoop 0xB77E0068 stop looping - EventLoop.cc:66
[root@localhost bin]#

测试程序2

#include <muduo/net/EventLoop.h>

#include <stdio.h>
/**
该程序主要用来测试,多个线程使用同一个eventloop时,程序将会错误终止!!
**/

using namespace muduo;
using namespace muduo::net;

EventLoop* g_loop;

void threadFunc()
{

g_loop->loop();
}

int main(void)
{

EventLoop loop;
g_loop = &loop;
Thread t(threadFunc);
t.start();
t.join();
return 0;
}

程序输出

[root@localhost bin]# ./reactor_test02
20131018 04:15:09.010234Z 2730 TRACE EventLoop EventLoop created 0xBFD53730 in thread 2730 - EventLoop.cc:36
20131018 04:15:09.010768Z 2731 FATAL EventLoop::abortNotInLoopThread - EventLoop 0xBFD53730 was created in threadId_ = 2730, current thread id = 2731 - EventLoop.cc:72
Aborted
[root@localhost bin]#

[27] EpollPoller

muduo net library

EpollPoller的头文件

epollpller.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H

#include <muduo/net/Poller.h>

#include <map>
#include <vector>

struct epoll_event;

namespace muduo
{
namespace net
{

///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
virtual ~EPollPoller();
// timeoutMs 超时事件
// activeChannels活动通道
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
//更新通道
virtual void updateChannel(Channel* channel);
//移除通道
virtual void removeChannel(Channel* channel);

private:
// EventListd的初始值
static const int kInitEventListSize = 16;

void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
;

//更新
void update(int operation, Channel* channel);

typedef std::vector<struct epoll_event> EventList;

typedef std::map<int, Channel*> ChannelMap;

//文件描述符 = epoll_create1(EPOLL_CLOEXEC),用来表示要关注事件的fd的集合的描述符
int epollfd_;
// epoll_wait返回的活动的通道channelList
EventList events_;
//通道map
ChannelMap channels_;
};

}
}
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H

EpollPoller的源文件

epollpoller.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/poller/EPollPoller.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>

#include <boost/static_assert.hpp>

#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>

using namespace muduo;
using namespace muduo::net;

// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);

namespace
{
const int kNew = -1; //新通道
const int kAdded = 1; //要关注的通道
const int kDeleted = 2; //要删除的通道
}

EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}

EPollPoller::~EPollPoller()
{
::close(epollfd_);
}

// IO 线程
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
//监听事件的到来
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);

Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
/*添加活动通道channel*/
fillActiveChannels(numEvents, activeChannels);
//如果活动通道的容器已满,则增加活动通道容器的容量
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "EPollPoller::poll()";
}
return now;
}

/*添加活动通道channel*/
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
//如果是调试状态,则
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
//否则直接跳到这里
// 设置channel的“可用事件”
channel->set_revents(events_[i].events);
// 加入活动通道容器
activeChannels->push_back(channel);
}
}

//更新某个通道 channel
void EPollPoller::updateChannel(Channel* channel)
{
//断言 在IO线程中
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();

//channel 的默认值是 -1 , ---->>channel class
const int index = channel->index();
// kNew 表示有新的通道要增加,kDeleted表示将已不关注事件的fd重新关注事件,及时重新加到epollfd_中去
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
//如果是新的channel,那么在channels_里面是找不到的
assert(channels_.find(fd) == channels_.end());
//添加到channels_中
channels_[fd] = channel;
}

else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
//
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
//断言已经在channels_里面了,并且已在epollfd_中
assert(index == kAdded);

//剔除channel的关注事件
//如果channel没有事件关注了,就把他从epollfd_中剔除掉
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
//更新index = kDeleted
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}

// 移除channel
void EPollPoller::removeChannel(Channel* channel)
{
//断言实在IO线程中
Poller::assertInLoopThread();
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
//断言能在channels_里面找到channel
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
//断言所要移除的channel已经没有事件关注了,但是此时在event_里面可能还有他的记录
assert(channel->isNoneEvent());
int index = channel->index();
//断言
assert(index == kAdded || index == kDeleted);
//真正从channels_里面删除掉channel
size_t n = channels_.erase(fd);
(void)n;
assert(n == 1);

//从event_中剔除channel
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}

// channel现在变成新的channel了
channel->set_index(kNew);
}

void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
bzero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
//更新操作
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
//写入日志
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
}
}
}

测试程序

Reactor_test03.cc

#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>

#include <boost/bind.hpp>

#include <stdio.h>
#include <sys/timerfd.h>

using namespace muduo;
using namespace muduo::net;

EventLoop* g_loop;
int timerfd;

void timeout(Timestamp receiveTime)
{

printf("Timeout!/n");
uint64_t howmany;
::read(timerfd, &howmany, sizeof howmany);
g_loop->quit();
}

int main(void)
{

EventLoop loop;
g_loop = &loop;

timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
Channel channel(&loop, timerfd);
channel.setReadCallback(boost::bind(timeout, _1));
channel.enableReading();

struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 1;
::timerfd_settime(timerfd, 0, &howlong, NULL);

loop.loop();

::close(timerfd);
}

程序输出

[root@localhost bin]# ./reactor_test03
20131020 02:24:05.657327Z 4009 TRACE EventLoop EventLoop created 0xBFD2AAD4 in thread 4009 - EventLoop.cc:42
20131020 02:24:05.657513Z 4009 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131020 02:24:05.657554Z 4009 TRACE loop EventLoop 0xBFD2AAD4 start looping - EventLoop.cc:68
20131020 02:24:06.658756Z 4009 TRACE poll 1 events happended - EPollPoller.cc:65
20131020 02:24:06.658972Z 4009 TRACE printActiveChannels {4: IN } - EventLoop.cc:139
Timeout!
20131020 02:24:06.659008Z 4009 TRACE loop EventLoop 0xBFD2AAD4 stop looping - EventLoop.cc:93
[root@localhost bin]#

[28] muduo的定时器由三个类实现,TimerId(定时器)、Timer(最上层的抽象)、TimerQueue(定时器的列表)

  • muduo的定时器由三个类实现,TimerId(定时器)、Timer(最上层的抽象,并没有调用定时器的相关函数)、TimerQueue(定时器的列表),用户只能看到第一个类,其它两个都是内部实现细节
  • TimerQueue的接口很简单,只有两个函数addTimer和cancel。实际上这两个函数也没有向外部开发,访问它要通过EventLoop的相关方法
  • EventLoop

    runAt    在某个时刻运行定时器------>TimerQueue.addTimer

    runAfter 过一段时间运行定时器---->TimeQueue.addTimer

    runEvery 每隔一段时间运行定时器->TimerQueue.addTimer

    cancel 取消定时器 ----》TimeQueue.cancal

  • TimerQueue数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set;如果选择map的话,是有问题的,可能TimeQueue里面有时间戳是一样的,但定时器timer*是不一样的,可以考虑multimap,不过最后还是不要使用

typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

muduo net library

时序图

muduo net library

Timer头文件

timer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H

#include <boost/noncopyable.hpp>

#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>

namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }

void run() const
{

callback_();
}

Timestamp expiration() const { return expiration_; }
bool repeat() const { return repeat_; }
int64_t sequence() const { return sequence_; }

void restart(Timestamp now);

static int64_t numCreated() { return s_numCreated_.get(); }

private:
const TimerCallback callback_; // 定时器回调函数
Timestamp expiration_; // 下一次的超时时刻
const double interval_; // 超时时间间隔,如果是一次性定时器,该值为0
const bool repeat_; // 是否重复
const int64_t sequence_; // 定时器序号

static AtomicInt64 s_numCreated_; // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif // MUDUO_NET_TIMER_H

Timer源文件

timer.cc


// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/Timer.h>

using namespace muduo;
using namespace muduo::net;

AtomicInt64 Timer::s_numCreated_;

void Timer::restart(Timestamp now)
{
if (repeat_)
{
// 重新计算下一个超时时刻
expiration_ = addTime(now, interval_);
}
else
{
expiration_ = Timestamp::invalid();
}
}

TimerId头文件

timerid.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TIMERID_H
#define MUDUO_NET_TIMERID_H

#include <muduo/base/copyable.h>

namespace muduo
{
namespace net
{

class Timer;

///是一个不透明的ID ,是外部可见的一个类
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
{
}

TimerId(Timer* timer, int64_t seq)
: timer_(timer),
sequence_(seq)
{
}

// default copy-ctor, dtor and assignment are okay

friend class TimerQueue;

private:
Timer* timer_; //定时器的地址 ,timer里面也包含了timer的序号
int64_t sequence_; //定时器的序号
};

}
}

#endif // MUDUO_NET_TIMERID_H

TimerQueue头文件

timerqueue.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H

#include <boost/noncopyable.hpp>

#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>

namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }

void run() const
{

callback_();
}

Timestamp expiration() const { return expiration_; }
bool repeat() const { return repeat_; }
int64_t sequence() const { return sequence_; }

void restart(Timestamp now);

static int64_t numCreated() { return s_numCreated_.get(); }

private:
const TimerCallback callback_; // 定时器回调函数
Timestamp expiration_; // 下一次的超时时刻
const double interval_; // 超时时间间隔,如果是一次性定时器,该值为0
const bool repeat_; // 是否重复
const int64_t sequence_; // 定时器序号

static AtomicInt64 s_numCreated_; // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif // MUDUO_NET_TIMER_H

TimerQueue源文件

timerqueue.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#define __STDC_LIMIT_MACROS
#include <muduo/net/TimerQueue.h>

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Timer.h>
#include <muduo/net/TimerId.h>

#include <boost/bind.hpp>

#include <sys/timerfd.h>

namespace muduo
{
namespace net
{
namespace detail
{

// 创建定时器
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}

// 计算超时时刻与当前时间的时间差
struct timespec howMuchTimeFromNow(Timestamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch()
- Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}

// 清除定时器,避免一直触发
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}

// 重置定时器的超时时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}

}
}
}

using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;

TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
/*把timerfd 加入到poller来关注*/
timerfdChannel_.enableReading();
}

TimerQueue::~TimerQueue()
{
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second;
}
}
/*增加一个定时器addTimer()----->addTimerInLoop()*/
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{

/**
注意:addTimer虽然是线程安全的,但是这里把安全实现的代码给注释掉了,所以以下的代码必须在所属
的EventLoop IO线程中调用
*/
Timer* timer = new Timer(cb, when, interval);
/*
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
*/
addTimerInLoop(timer);
return TimerId(timer, timer->sequence());
}

void TimerQueue::cancel(TimerId timerId)
{
/*
loop_->runInLoop(
boost::bind(&TimerQueue::cancelInLoop, this, timerId));
*/
cancelInLoop(timerId);
}
/*添加定时器到所属eventloop IO线程中*/
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
// 插入一个定时器,有可能会使得最早到期的定时器发生改变
bool earliestChanged = insert(timer);

if (earliestChanged)
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, timer->expiration());
}
}

/*取消某个timer*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
// 查找该定时器
ActiveTimerSet::iterator it = activeTimers_.find(timer);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please,如果用了unique_ptr,这里就不需要手动删除了
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
// 已经到期,并且正在调用回调函数的定时器 , 那么将其加到cancelingtimers中,
// 以便在其回调处理完后, 在reset(expired, now)里时无效,不需要重置
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}

/*定时器触发的回调函数*/
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now); // 清除该事件,避免一直触发

// 获取该时刻之前所有的定时器列表(即超时定时器列表)
std::vector<Entry> expired = getExpired(now);
//已经处于处理到期定时器当中
callingExpiredTimers_ = true;
//清除上一次的超时定时器队列
cancelingTimers_.clear();
// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
// 这里回调定时器处理函数
it->second->run();
}
callingExpiredTimers_ = false;

// 不是一次性定时器,需要重启
reset(expired, now);
}

// rvo 优化
//返回已经超时的timers
// TimerQueue = 1,1,1,3,4,5,7,9
// 那么触发第一个1时,其实后面的2个1也被触发了
// timerQueue 只管队列中的第一个定时器
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// 返回第一个未到期的Timer的迭代器
// lower_bound的含义是返回第一个值>=sentry的元素的iterator
// 即*end >= sentry,从而end->first > now
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
// 将到期的定时器插入到expired中
std::copy(timers_.begin(), end, back_inserter(expired));
// 从timers_中移除到期的定时器
timers_.erase(timers_.begin(), end);

// 从activeTimers_中移除到期的定时器
for (std::vector<Entry>::iterator it = expired.begin();it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}

assert(timers_.size() == activeTimers_.size());
return expired;
}

void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;

for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
// 如果是重复的定时器并且是未取消定时器(未被其他线程取消掉),则重启该定时器
// cancelingTimers_.find(timer) == cancelingTimers_.end() 在取消的队列中找到不到timer
if (it->second->repeat()&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it->second->restart(now);
insert(it->second);
}
else
{
// 一次性定时器或者已被取消的定时器是不能重置的,因此删除该定时器
// FIXME move to a free list
delete it->second; // FIXME: no delete please
}
}

if (!timers_.empty())
{
// 获取最早到期的定时器超时时间
nextExpire = timers_.begin()->second->expiration();
}

if (nextExpire.valid())
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, nextExpire);
}
}

bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
// 最早到期时间是否改变
bool earliestChanged = false;
//获取传进来的定时器的超时时间
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
// 如果timers_为空或者when小于timers_中的最早到期时间
if (it == timers_.end() || when < it->first)
{

earliestChanged = true;
}
{
// 插入到timers_中
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
/*断言插入操作是否成功*/
assert(result.second); (void)result;
}
{
// 插入到activeTimers_中
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
//断言是否成功操作了
assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » muduo net library

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址