WWW.DUMAIS.IO
ARTICLES
OVERLAY NETWORKS WITH MY SDN CONTROLLERSIMPLE LEARNING SWITCH WITH OPENFLOWINSTALLING KUBERNETES MANUALLYWRITING A HYPERVISOR WITH INTEL VT-X CREATING YOUR OWN LINUX CONTAINERSVIRTIO DRIVER IMPLEMENTATIONNETWORKING IN MY OSESP8266 BASED IRRIGATION CONTROLLERLED STRIP CONTROLLER USING ESP8266.OPENVSWITCH ON SLACKWARESHA256 ASSEMBLY IMPLEMENTATIONPROCESS CONTEXT ID AND THE TLBTHREAD MANAGEMENT IN MY HOBBY OSENABLING MULTI-PROCESSORS IN MY HOBBY OSNEW HOME AUTOMATION SYSTEMINSTALLING AND USING DOCKER ON SLACKWARESYSTEM ON A CHIP EMULATORUSING JSSIP AND ASTERISK TO MAKE A WEBPHONEC++ WEBSOCKET SERVERSIP ATTACK BANNINGBLOCK CACHING AND WRITEBACKBEAGLEBONE BLACK BARE METAL DEVELOPEMENTARM BARE METAL DEVELOPMENTUSING EPOLLMEMORY PAGINGIMPLEMENTING HTTP DIGEST AUTHENTICATIONSTACK FRAME AND THE RED ZONE (X86_64)AVX/SSE AND CONTEXT SWITCHINGHOW TO ANSWER A QUESTION THE SMART WAY.REALTEK 8139 NETWORK CARD DRIVERREST INTERFACE ENGINECISCO 1760 AS AN FXS GATEWAYHOME AUTOMATION SYSTEMEZFLORA IRRIGATION SYSTEMSUMP PUMP MONITORINGBUILDING A HOSTED MAILSERVER SERVICEI AM NOW HOSTING MY OWN DNS AND MAIL SERVERS ON AMAZON EC2DEPLOYING A LAYER3 SWITCH ON MY NETWORKACD SERVER WITH RESIPROCATEC++ JSON LIBRARYIMPLEMENTING YOUR OWN MUTEX WITH CMPXCHGWAKEUPCALL SERVER USING RESIPROCATEFFT ON AMD64CLONING A HARD DRIVECONFIGURING AND USING KVM-QEMUUSING COUCHDBINSTALLING COUCHDB ON SLACKWARENGW100 MY OS AND EDXS/LSENGW100 - MY OSASTERISK FILTER APPLICATIONCISCO ROUTER CONFIGURATIONAASTRA 411 XML APPLICATIONSPA941 PHONEBOOKSPEEDTOUCH 780 DOCUMENTATIONAASTRA CONTACT LIST XML APPLICATIONAVR32 OS FOR NGW100ASTERISK SOUND INJECTION APPLICATIONNGW100 - DIFFERENT PROBLEMS AND SOLUTIONSAASTRA PRIME RATE XML APPLICATIONSPEEDTOUCH 780 CONFIGURATIONUSING COUCHDB WITH PHPAVR32 ASSEMBLY TIPAP7000 AND NGW100 ARCHITECTUREAASTRA WEATHER XML APPLICATIONNGW100 - GETTING STARTEDAASTRA ALI XML APPLICATION

USING EPOLL

2014-10-10

Introduction

I heard about epoll not so long ago and wanted to give it a try. Epoll is the new way to use non-blocking sockets (or any file descriptors). It replaces poll() and select().

epoll is easy enough to use. The good thing about it is that, unline select(), you don't need to rebuild the FDSET each time you call epoll_wait. Here is a typical usage:

  • efd = epoll_create1()
  • epoll_ctl(efd, EPOLL_CTL_ADD, listeningSocket)
  • loop:
    • n = epoll_wait()
    • for i=0 to n:
      • if (events[i].data.fd == listeningSocket)
        • newsock = accept()
        • epoll_ctl(efd, EPOLL_CTL_ADD, newsock)
  • close(efd)

Another nice thing about epoll is that you don't need to worry about removing a socket from the list. You can call epoll_ctl(efd, EPOLL_CTL_DEL, sock) if you want but when the socket closes, it will be removed automatically.

One thread

Using epoll, I can do all my sending and receiving in one thread. So people may suggest to send data from other thread but what if you get EAGAIN? Assume that thread A is the epoll thread. Thread B attempts to send 100 bytes but could only send 50. Then Thread C attempts to write 100 bytes on the same socket. By the time that Thread C was called, the socket was ready. The remote socket will have received corrupted data. For that reason, Thread B and C, will add data in a queue so that each messages are guaranteed to be sent completely. The queue will be emptied in Thread A.

Since epoll might potentially handle thousands of connections, Thread A must do minimal work. It must only do this:

  • epoll_wait
  • if a socket is ready-ready, read all from that socket and dispatch to consumers. consumers should work fast, otherwise we should "post" the data to the consumer in another thread.
  • send all from the sendqueue

Edge-triggered VS Level-Triggered

epoll offers two ways of working: Edged-triggered and Level-triggered. The default way is Level-triggered. so you might want to change this to level-triggered in your application.

In Level-triggered mode, epoll will return as long a socket is read or write ready. So if data is ready to be received on the socket and you only read part of it, the next epoll call will return because there is still data available. As soon as the internal receive buffer is empty, then epoll won't return. But since the socket will be write-ready almost all of the time, it will return. This is not something you would typically want. My guess is that if you want to use level-triggered mode, you should not register to get EPOLLOUT events unless you have something to send. So while your application's TX buffer is empty, you don't register for EPOLLOUT. As soon as data is added to the TX buffer, you register for EPOLLOUT, epoll will return and you write data out on the socket. If EAGAIN was returned then you will block on the next epoll_wait() and can send the rest of the buffer when it unblocks. Once the application's TX buffer is empty, you could unregister for EPOLLOUT.

With Edge-triggered mode, things are different. You will only receive an event if the status has changed from not-ready to ready. So if you have incomming data and you only read part of it, when you will call epoll_wait, the socket will still be ready. So epoll_wait will block because the state will not change from not-ready to ready. So if you are using that mode, you must make sure to read ALL the data on the socket until you get EAGAIN. If you want to send something, epoll_wait will only give you a write-ready event if you previously got EAGAIN while attempting to write out.

Let's say you have an internal transmit queue in which you add data when you want to send it out. The epoll thread would need to ready from that queue, write the data on the socket and handle EAGAIN appropriately.

  • epoll_wait()
  • if a socket is ready-ready, readAllFromThatSocket(socket);
  • sendAllData() // sends all that is contained in the internal queue.
  • goto step 1

But if epoll_wait() is blocked in triggered mode, and you add data in the queue (from another thread), sendAllData() will not be called until epoll_wait returns because data is ready to be received (it won't return because data is ready to write, because you need to write first and get EAGAIN for that.). To solve this problem, I created an eventfd (see sys/eventfd.h). I add the eventfd in the epoll_wait list and whenever I add data in the application's TX queue, I write 1 on the eventfd so that epoll_wait will break out. I could have used a timout in epoll_wait, but that would add unnecessary latency to amy reactor. That way of doing things is similar to what is called the "pipe to self trick".

My framework

To play with epoll, I wrote a small reactor framework that exposes an interface that's easy to use. all the gory details are hidden inside the framework. To use it you would need to define two classes:

class OwnProtocolFramingStrategy: public IFramingStrategy { protected: virtual int processData(char* buffer, size_t size, FullMessageInfo& info) { info.buffer will first be zero. You need to allocate memory for that buffer and copy the data from "buffer" to "info.buffer". If you determine that the buffer does not contain a full message, return the quantity of bytes you read from the buffer. In this case, it should be "return size;" And the next time that some data is received, this function will be called with you buffer. You can update other fields in "info" to help you resume next time. If you determine that a full message was received, set "info.complete = true" and return the number of bytes that you read from the buffer. After returning from this function, your buffer created in "info.buffer" will be considered as containing a full message and will be passed to the TcpEngine. Next time this function will be called, info.buffer will be back to zero. If you determine that a full message was received but there are still data left in the buffer, it means that you probably have 2 messages in the buffer. As mentioned above, you will return the number of bytes that you read from the buffer. This function will be called again with "buffer" pointing to the index after the last byte you have read. So you only need to try to parse one message only when this function is called. If you find that data is not well-formed, does not respect you protocol or has any other errors that prevents you from reliably build a full message, then return -1 and the client's connection will be aborted. } };

A IFramingStrategy is a class that will process the tcp stream into full messages. As you know, when reading data from a socket, you may receive more than one message in a single read and you could also only receive half of a message. So the IFramingStrategy is what parses the stream and builds messages. For example, if implementing an HTTP Server, the strategy would build the message by expecting the data to be chunked so the logic to calculate the size would be done here.

class OwnProtocolServerEngine: public TcpEngine { public: OwnProtocolServerEngine():TcpEngine(new ClientFactory(this)) { } protected: virtual void onClientConnected(TcpClient* client) { Maybe add the client in some list so we can use it later? } virtual void onClientDisconnected(TcpClient* client) { if client was added in a list, we should remove it because its instanced will be deleted after returning from this method. } virtual void onClientMessage(TcpClient* client, char* buffer, size_t size) { The buffer you get here, is the one that was created in the strategy. You own this buffer in this method, so you must free it from here (or remember to free at a later time) In this method, you are guaranteed to get 1 and only 1 full message assuming that the Strategy was coded correctly. Note that everything is happening in 1 thread. So in the onClientConnected() method, you might have saved a TcpClient pointer in a list. It is perfectly safe to use it in here (i.e: for forwarding the message to someone else). } };

Then to run launch a server:

OwnProtocolServerEngine engine; engine.init(5555,"127.0.0.1",10); if (!engine.start()) { printf("Could not start server\r\n"); return -1; } getchar(); engine.stop();

The source code for the framework is here

Thread-safe queue

I said earlier that everything was happening in 1 single thread so everything *should* be safe. But what if I want to send data to a client from another thread? That should be possible. The transmit queue inside the TcpClient is thread safe. So calling TcpClient::sendData() is a safe call to do in another thread. Accessing the client is another story though. The client instance could get deleted at any time, but there are ways around that but it's beyond the scope of this framework.

Since the TX queue is a Multi-Producer, Single-Consumer FIFO, creating a thread-safe implementation is very easy. Of course, I didn't want to use any of the STL containers.

This is using __sync_bool_compare_and_swap, which I strongly suspect uses the x86 instruction CMPXCHG. I described that instruction here I didn't fully test that code though . I addapted it from a version I had written in ASM, so there might be bugs in the C++ version.