神刀安全网

Implementing Queues for Event-Driven Programs

Implementing Queues for Event-Driven Programs Author: “No Bugs” Hare   Follow:  Implementing Queues for Event-Driven Programs Implementing Queues for Event-Driven Programs
Job Title: Sarcastic Architect
Hobbies: Thinking Aloud , Arguing with Managers , Annoying HRs ,
Calling a Spade a Spade , Keeping Tongue in Cheek

Implementing Queues for Event-Driven Programs

Implementing Queues for Event-Driven Programs

[[ This is Chapter XIII(d) from “beta” Volume 2 of the upcoming book “Development&Deployment of Multiplayer Online Games”, which is currently being beta-tested. Beta-testing is intended to improve the quality of the book, and provides free e-copy of the “release” book to those who help with improving; for further details see “Book Beta Testing“. All the content published during Beta Testing, is subject to change before the book is published.

To navigate through the book, you may want to use  Development&Deployment of MOG: Table of Contents . ]]

We’ve already discussed things related to sockets; now let’s discuss the stuff which is often needed (in particular, it is of Utmost Importance when implementing Reactors), but which is not that common to be universally available as a part of operating system.

I’m speaking about queues. And not only just about “some” queue, but about queues which have certain properties desirable for our Reactors a.k.a. ad-hoc Finite State Machines a.k.a. Event-Driven Programs.

Simple MWSR Queue

What we usually need from our Queue, is an ability to push asynchronous messages/events there (usually from different threads), and to get them back (usually from one single thread) – in FIFO order, of course. Reading from an empty queue MUST block until something appears there; this is necessary to avoid polling. On the other hand, writing MAY block if the queue is full, though in practice this should happen Really Rarely.

Let’s consider the following simple implementation (with no blocking, as our queue cannot become “full”):

template <class Collection> class MWSRQueue {   private:   std::mutex mx;   std::condition_variable waitrd;   Collection coll;   bool killflag = false;    public:   using T = Collection::value_type;   MWSRQueue() {   }    void push_back(T&& it) {     //as a rule of thumb, DO prefer move semantics for queues     //   it reduces the number of potential allocations     //   which happen under the lock(!), as such extra     //   unnecessary allocations due to unnecessary copying     //   can have Bad Impact on performance     //   because of significantly increased mutex contention     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     coll.push_back(it);     }//unlocking mx      waitrd.notify_one();     //Yep, notifying outside of lock is usually BETTER.     //  Otherwise the other thread would be released     //  but will immediately run into     //  our own lock above, causing unnecessary     //  (and Very Expensive) context switch   }    pair<bool,T> pop_front() {     //returns pair<true,popped_value>,     //  or – if the queue is being killed - <false,T()>     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     while(coll.size() == 0 && !killflag) {       waitrd.wait(lock);     }     if(killflag)       return pair<bool,T>(false,T());         //creates an unnecessary copy of T(),         //  but usually we won’t care much at this point      assert(coll.size() > 0);     T ret = coll.front();     coll.pop_front();     }//unlocking mx      return pair<bool,T>(true, ret);   }  void kill() {     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     killflag = true;     }//unlocking mx      waitrd.notify_one();   }  };

[[TODO!:test!]]

This is a rather naïve implementation of MWSR queues, but – it will work for quite a while, and it uses only very standard C++11, so it will work pretty much everywhere these days. More importantly, it does implement exactly the API which you need: you can push the items back from other threads, you can read your items from a single thread, and you can request that the wait (if any) is aborted, so your thread can terminate (for example, if you want to terminate your app gracefully). Moreover, our queue provides the whole API which you’ll ever need from your queue; this IS important as it means that you can re-implement your queue later if necessary in a better-performing manner, and nobody will notice the difference.

Implementing Queues for Event-Driven Programs A nice (though side) implementation detail is that our template class MWSRQueue can use any collection which implements usual-for-std-containers functions push_back() , pop_front() , and front() . A nice (though side) implementation detail is that our template class MWSRQueue can use any collection which implements usual-for-std-containers functions push_back() , pop_front() , and front() . It means that you can use std::list<> or std::deque<> directly, or make your own class which satisfies this API (for example, you can make your own prioritized queue). Oh BTW, and (by pure accident) it seems to be exception-safe too (even in a strong sense).

OTOH, this naïve implementation has several significant drawbacks, which MAY come into play as soon as we become concerned about performance and reliability. Let’s see these drawbacks one by one.

1 Note that std::priority_queue<> as such does NOT guarantee the order in case of elements with equal priority, so to make a FIFO-queue-with-priority out of it, you’ll need to make another adapter which assigns number-of-item-since-very-beginning as one of the parameters (and then sort by tuple (priority, number_of_item_since_very_beginning) – and DON’T forget about potential wraparounds too! – that is, unless you’re using uint64_t as your number_of_item_since_very_beginning, when in most practical cases you can demonstrate that wraparound will never happen

Fixed-Size Queues

As our class MWSRQueue above is organized, queue size may grow indefinitely. This might look as a Good Thing from theoretical point of view (“hey, we don’t put any limits on our Queue”), but in the real world it often causes severe issues Implementing Queues for Event-Driven Programs . For example, if for some reason one of your servers/Reactors starts to delay processing (or even hangs), such infinite-sized queues can easily eat up all the available RAM, causing swap or denial of allocations, and potentially affecting MUCH more players than it should.

Flow Control

Let’s consider what will happen in the case of one of the Reactors hanging/slowing-down if we limit the size of ALL our queues within the system.

If we limit sizes of ALL our queues, AND all our connections are TCP, then in case of severe overload the following scenario will unfold. First, one queue (the one close to the slow Reactor) will get full; in turn, queue being full will cause TCP thread which fills it, to block.Then, the TCP thread on the other side of TCP connection will find that it cannot push data into TCP, so it will block too. Then, the queue which feeds that TCP thread on pushing side, will get full. Then, the sending Reactor’s supposedly-non-blocking function sendMessage() , will be unable to push the message into the queue-which-just-became-full, so our supposedly-non-blocking Reactor will block.

Implementing Queues for Event-Driven Programs As we can see, when working with all flow-controlled transports, severe delays tend to propagate from the target to the source. As we can see, when working with all flow-controlled transports (TCP is flow-controlled, and fixed-size queue is flow-controlled too), severe delays tend to propagate from the target to the source. Whether it is good or not – depends on specifics of your system, though from what I’ve seen, in most cases such propagating delays are at least not worse than exhausting RAM which happens in case of infinite queues.

Also, it gives us back the control over what-we-want-to-do in case of such a problem. For example, to avoid one Reactor which processes messages from pretty much independent channels and feeding them to different Reactors, from blocking all the channels in case of one of the target Reactors being slow or hanged, we MAY be able to “teach” our single Reactor to postpone just messages from the affected channel, while working with the other channels as usual. Implementing it would require two things: (a) adding a trySendMessage() function, which tries to send message and returns “send wait handle” if the sending is unsuccessful, and (b) adding a list_of_wait_handles parameter to pop_front() function, with understanding that if some space becomes available in any of “send wait handle”s, pop_front() stops the wait and returns “send wait handle” to the caller (and then infrastructure code will need to send a message/call a callback or continuation from our Reactor).

2 in case when there is no TCP between Reactors so that Reactors are interacting directly, sending Reactor’s supposedly-non-blocking sendMessage() will block immediately, as described below

Dropping Packets

When dealing with messages coming over TCP or over internal communications, we’re usually relying on ALL the messages being delivered (and in order too); that’s why dropping messages is usually not an option on these queues.

Implementing Queues for Event-Driven Programs for UDP packets, there is always an option to drop them if the incoming queue is full However, for UDP packets, there is always an option to drop them if the incoming queue is full;this is possible because any UDP packets can be dropped anyway, so that our upper-level protocols need to handle dropped packets regardless of us dropping some packets at application level. Moreover, we can implement a selective packet drop if we feel like it (for example, we can drop less important traffic in favor of more important one).

3 strictly speaking, if you DO implement reliable inter-Server communications as described in Chapter III, you MAY be able to force-terminate TCP connection, AND to drop all the messages from that connection from the queue too. Not sure whether it is ever useful though.

4 Or even almost-full, see, for example,family of congestion avoidance algorithms

Full Queues are Abnormal. Size. Tracking

Implementing Queues for Event-Driven Programs full queues SHOULD NOT happen during normal operation Regardless of the choice whether-to-block-or-to-drop outlines above, full queues SHOULD NOT happen during normal operation; they’re more like a way to handle scenarios when something has Already Went Wrong, and to recover from them while minimizing losses. That’s why it is Really Important to keep track of all the queue blocks (due to the queue being full), and to report it to your monitoring system; for this purpose, our queues should provide counters so that infrastructure code can read them and report to a system-wide monitor (see more on monitoring in Vol.3).

Now let’s discuss a question of maximum size of our fixed-size queues. On the one hand, we obviously do NOT want to have any kind of swapping because of the memory allocated to our fixed-size queues. On the other hand, we cannot have our queues limited to maximum size of 2 or 3. If our queue is too small, then we can easily run into scenarios of starvation, when our Reactor is effectively blocked by the flow control mechanisms from doing things (while there is work somewhere in the system, it cannot reach our Reactor). In the extreme cases (and ultra-small sizes like 2 or 3), we can even run into deadlocks (!).

My recommendation when it comes to maximum size of the queues, goes as follows:

Start with maximum size of between 100 and 1000; most of the time, it should be large enough to stay away from blocks and also to avoid allocating too much memory for them.DO monitor maximum sizes in production (and especially “queue is full” conditions), and act accordingly.

Implementing Fixed-Size Queue with Flow Control

Now, after we’ve specified what we want, we’re ready to define our own Fixed-Size Queues. Let’s start with a Fixed-Size Queue with Flow Control:

template <class FixedSizeCollection> class MWSRFixedSizeQueueWithFlowControl {   private:   std::mutex mx;   std::condition_variable waitrd;   std::condition_variable waitwr;   FixedSizeCollection coll;   bool killflag = false;    //stats:   int nfulls = 0;   size_t hwmsize = 0;//high watermark on queue size    public:   using T = FixedSizeCollection::value_type;    MWSRFixedSizeQueueWithFlowControl() {   }   void push_back(T&& it) {     //if the queue is full, BLOCKS until some space is freed     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     while(coll.is_full() && !killflag) {       waitwr.wait(lock);       ++nfulls;       //this will also count spurious wakeups,       //  but they’re supposedly rare     }      assert(!coll.is_full());     coll.push_back(it);     size_t sz = coll.size();     hwmsize = max(hwmsize,sz);     }//unlocking mx      waitrd.notify_one();   }    pair<bool,T> pop_front() {     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     while(coll.size() == 0 && !killflag) {       waitrd.wait(lock);     }     if(killflag)       return pair<bool,T>(false,T());      assert(coll.size() > 0);     T ret = coll.front();     coll.pop_front();     }//unlocking mx      waitwr.notify_one();      return pair<bool,T>(true, ret);   }    void kill() {     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     killflag = true;     }//unlocking mx    waitrd.notify_one();   waitwr.notify_one();   } };

Implementing Fixed-Size Queue with a Drop Policy

And here goes a Fixed-Size Queue with a Drop Policy:

template <class FixedSizeCollection, class DropPolicy>   // DropPolicy should have function   //    pushAndDropOne(T&& t, FixedSizeCollection& coll)   //    it MAY either to skip t,   //    OR to drop something from coll, while pushing t class MWSRFixedSizeQueueWithDropPolicy {   private:   DropPolicy drop;   std::mutex mx;   std::condition_variable waitrd;   FixedSizeCollection coll;   bool killflag = false;    //stats:   int ndrops = 0;   size_t hwmsize = 0;//high watermark on queue size    public:   using T = FixedSizeCollection::value_type;    MWSRFixedSizeQueueWithDropPolicy(const DropPolicy& drop_)   : drop(drop_) {   }    void push_back(T&& it) {     //if the queue is full, calls drop.pushAndDropOne()     {//creating a scope for lock     std::unique_lock<std::mutex> lock(mx);      if(coll.is_full()) {//you MAY want to use                         //  unlikely() here       ++ndrops;       drop.pushAndDropOne(it, coll);       return;     }      assert(!coll.is_full());     coll.push_back(it);     size_t sz = coll.size();     hwmsize = max(hwmsize,sz);     }//unlocking mx      waitrd.notify_one();   }    pair<bool,T> pop_front() {     {//creating a scope for lock     std::unique_lock<std::mutex> lock(mx);     while(coll.size() == 0 && !killflag) {       waitrd.wait(lock);     }      if(killflag)       return pair<bool,T>(false,T());     assert(coll.size() > 0);     T ret = coll.front();     coll.pop_front();     }//unlocking mx      return pair<bool,T>(true, ret);   }    void kill() {     {//creating scope for lock     std::unique_lock<std::mutex> lock(mx);     killflag = true;     }//unlocking mx      waitrd.notify_one();   } };

Performance Issues

As we’re running our system, we MAY run into performance issues; sometimes, it is those queues which cause us trouble.

Implementing Queues for Event-Driven Programs With queues-implemented-over-mutexes like the ones we’ve written above, the most annoying thing performance-wise is that there is a chance that the OS’s scheduler can force the preemptive context switch right when the thread-being-preempted-is-owning-our-mutex. With queues-implemented-over-mutexes like the ones we’ve written above, the most annoying thing performance-wise is that there is a chance that the OS’s scheduler can force the preemptive context switch right when the thread-being-preempted-is-owning-our-mutex. This will cause quite a few context switches going back and forth. Such unnecessary context switches have a Big Fat impact on the performance Implementing Queues for Event-Driven Programs (as discussed in [TODO], context switch can cost up to a million CPU clocks).

5 Most of the time, such Bad Cases won’t apply to the kind of context switches we’re discussing here, but several context switches each costing 10K CPU clocks, is already Pretty Bad

To deal with it, two approaches are possible. Approach #1 would be simply to

Reduce Time Under Lock

As we reduce the time spent under the mutex lock, chances of that unfortunate-context-switch can be reduced to almost-zero (if we’re doing a Really Good Job, time-under-lock can be as little as a hundred CPU clocks under the lock, so chances of being forced-switched there, become very minimal). And without the lock being occupied, the time to acquire/release the lock usually becomes just two atomic/LOCK/Interlocked operations (and you cannot really do better than that).

Removing Allocations from Under the Lock

A mathematician is asked “how to boil water?” His answer goes as follows:

Let’s consider two cases. In the first case, there is no water in the kettle.

Then, we need to light a fire, put some water into the kettle,

place the kettle over the fire, and wait for some time.

In the second case, there is water in the kettle.

Then we need to pour the water out, and the problem is reduced to the previous case.

— A mathematician who Prefers to stay Anonymous —

Now, let’s see what we can do to reduce time under the lock. If we take a closer look at our class MWSRQueue , we’ll realize that all the operations under the lock are very minimal, except for potential allocations (and/or O(N) operations to move things around).

The problem is that none of the existing std:: containers provides a guarantee that there are neither allocations/deallocations nor O(N) operations within their respective push_back() and pop_front() operations.

std::list<>::push_back()/pop_front() Allocation/deallocation; some implementations MAY cache or pool allocations, but such optimizations are implementation-specific Implementing Queues for Event-Driven Programs
std::vector<>::erase(begin()) (as a replacement for pop_front()) O(N)
std::deque<>::push_back()/pop_front() Allocation/deallocation; some implementations MAY cache or pool allocations, but such optimizations are implementation-specific Implementing Queues for Event-Driven Programs

I know of two ways how to deal with this problem. First, it is possible to use some kind of pool allocation and feed pool allocator to std::list<> or std::deque<> (effectively guaranteeing that all the items are always taken from the pool and nothing else). However, IMO this solution, while workable, looks too much as a way mathematician gets the kettle boiled (see epigraph to this subsection).

Instead, I suggest to do the following:

  • If you need an infinite-size queue, you can use “intrusive lists” (allocating list elements outside the mutex lock, and reducing contention)
  • If you need a fixed-size queue, then you can create your own Collection based on circular buffer along the following lines:
template<class T, size_t maxsz_bits> class CircularBuffer {   constexpr size_t bufsz = 1 << maxsz_bits;   constexpr size_t maxsz = bufsz - 1;     //-1 to make sure that head==tail always means ‘empty’   constexpr size_t mask = maxsz;   size_t head = 0;   size_t tail = 0;   alignas(T) uint8_t buffer[bufsz*sizeof(T)];      public:     size_t size() {       return head – tail + (((head>=tail)-1) & bufsz);         //trickery to avoid pipeline stalls via arithmetic         //supposedly equivalent to:         //if(head >= tail)         //  return head – tail;         //else         //  return head + maxsz - tail;     }    void push_back(T&& t) {     assert(size() < maxsz);     new(&buffer[head]) T(t);     head = ( head + 1 ) & mask;    }     T pop_front() {     assert(size() > 0);     T ret = std::move(buffer[tail]);     buffer[tail].~T();     tail = ( tail + 1 ) & mask;     return ret;   } };

Removing locks completely

The second approach is MUCH more radical – it is the one to remove locks completely. And at the first glance, it seems  that it is easy to find an appropriate “lockless queue” library. However, there is a caveat:

We do NOT really need “completely lockless queue”. What we need, is a “queue which is lockless until it becomes empty or full”

In other words, our (almost)-lockless queue still needs to lock (otherwise we’d need to poll it, which puts us in a predicament between sacrificing latency and burning CPU cycle in a MUCH worse manner than any losses from the very-infrequent-context-switches on barely-loaded-locks).

Implementing Queues for Event-Driven Programs Unfortunately, I do NOT know of any readily-available library which supports such ‘blocking-only-when-necessary’ queues Unfortunately, I do NOT know of any readily-available library which supports such “blocking-only-when-necessary” queues Implementing Queues for Event-Driven Programs . Writing such a thing yourself is certainly possible, but keep in mind that it is going to be a Really Major Effort even if you’re proficient in writing synchro primitives  Implementing Queues for Event-Driven Programs (and Even More Major Effort to debug/test it and to prove its correctness ). Overall, if considering complexity of writing such a “blocking-only-when-necessary” queue from the point of view of exercises from Knuth’ “The Art of Computer Programming”, I would rate is around 40 Implementing Queues for Event-Driven Programs (with “50” being a “non-proven-yet theorem”).

6 yes, for non-trivial primitives such proof is necessary, even if it is done by an exhaustive analysis of all the context switches in all the substantially different points – of course, not forgetting about those nasty ABA problems

Waiting for Other Stuff

More often than not, in addition to waiting for incoming events, we MAY want to wait for “something else”. Examples of these “something else” things range from “something coming in from socket” to “user moving mouse”.

Of course, we could dedicate a thread to wait for several sockets (user input, DNS response, whatever-else) and pushing the result to one of our MWSR Queues, but it means extra context switches, and therefore is not always optimal.

In such cases, we MAY want to use some OS-specific mechanism which allows to wait for several such things simultaneously. Examples of such mechanisms include:

  • Implementing Queues for Event-Driven Programs To deal with those very-occasional other events (which cannot be handled via select()/poll()/epoll() ), a separate anonymous pipe (or equivalent) can be created, which can be listened by the very same select() -like function. (not exactly that OS-specific, but still different enough to be mention here): using select() (poll()/epoll())  as a queue . If MOST of your IO is sockets, and everything-else (like “a message coming in from another thread”) happens very occasionally, then it often makes sense to use  select() etc.  to deal with sockets – and with anything else too (with absolutely no mutexes etc. in sight). To deal with those very-occasional other events (which cannot be handled via  select()/poll()/epoll() because they’re not file handles, or because they’re regular files(!)), a separate anonymous pipe (or equivalent) can be created, which can be listened by the very same select() -like function. Bingo! Most of the things are handled with  select()/poll()/epoll()/… without any unnecessary context switches, and the very-occasional stuff is occasional enough to ignore the associated (usually not-too-bad) overhead of sending it over the pipe.
    • Note however, that this approach does NOT work too well performance-wise when most of your events  CANNOT be handled by  select()- like function directly (and need to be simulated over that pipe). While such a thing WILL work, the time spent on simulating events over pipes, can become substantial :-(.
  • kqueue().  On BSD, kqueue() allows to wait not only on file handles, and provides more flexibility than  epoll() , and occasionally allows to avoid an extra-thread-with-an-anonymous-pipe which would be necessary otherwise.
  • Win32 WaitForMultipleObjects() . WaitForMultipleObjects() can wait  both  for sockets  and  for “events”. This can be used to build a queue which can handle both sockets etc. and other stuff – all without those unnecessary context switches.
  • Win32 thread queues.  Another Win32-specific mechanism is related to thread queues (and GetMessage() function). These come handy when you need to handle both Windows messages and something-else (especially when you need to do it in a UI thread).

On libuv

In a sense,is The King when we speak about 3rd-party event handling libraries. It can take pretty much anything and make it asynchronous. However, being that universal comes at a price: libuv’s performance, while “pretty good”, is not “the best one possible”. In particular, the trickery described above, can often outperform libuv.

[[TODO: IPC/shared-memory]]

[[To Be Continued…

Implementing Queues for Event-Driven Programs This concludes beta Chapter XIII from the upcoming book “Development and Deployment of Multiplayer Online Games (from social games to MMOFPS, with social games in between)”. Stay tuned for beta Chapter XIV, where we’ll start discussing graphics (though ONLY as much as it is necessary for a multiplayer gamedev).]]

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Implementing Queues for Event-Driven Programs

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址