39 #ifndef FRAME_ENDPOINT_H 40 #define FRAME_ENDPOINT_H 46 #include <boost/asio.hpp> 65 class FrameEndpoint:
public std::enable_shared_from_this<FrameEndpoint> {
76 FrameEndpoint(boost::asio::io_service& a_IOService, boost::asio::ip::tcp::socket& a_TcpSocket, uint8_t a_FrameTypeMask = 0xFF): m_IOService(a_IOService), m_TcpSocket(std::move(a_TcpSocket)), m_FrameTypeMask(a_FrameTypeMask) {
78 m_SEPState = SEPSTATE_DISCONNECTED;
79 m_bWriteInProgress =
false;
84 m_BytesInReadBuffer = 0;
85 m_ReadBufferOffset = 0;
86 m_SendBufferOffset = 0;
95 m_OnFrameCallback =
nullptr;
96 m_OnClosedCallback =
nullptr;
110 m_FrameFactoryMap.clear();
111 m_FrameTypeMask = a_FrameTypeMask;
123 void RegisterFrameFactory(
unsigned char a_FrameType, std::function<std::shared_ptr<Frame>(
void)> a_FrameFactory) {
125 assert(a_FrameFactory);
126 unsigned char l_EffectiveFrameType = (a_FrameType & m_FrameTypeMask);
127 assert(m_FrameFactoryMap.find(l_EffectiveFrameType) == m_FrameFactoryMap.end());
128 m_FrameFactoryMap[l_EffectiveFrameType] = a_FrameFactory;
149 assert(m_FrameFactoryMap.empty() ==
false);
150 assert(m_bStarted ==
false);
151 assert(m_bStopped ==
false);
152 assert(m_SEPState == SEPSTATE_DISCONNECTED);
153 assert(m_bWriteInProgress ==
false);
154 assert(m_bReceiving ==
false);
155 m_SendBufferOffset = 0;
157 m_SEPState = SEPSTATE_CONNECTED;
158 auto self(shared_from_this());
160 if (!m_SendQueue.empty()) {
161 m_IOService.post([
this,
self](){ DoWrite(); });
182 if (m_bStarted && (!m_bStopped)) {
184 m_bReceiving =
false;
185 m_TcpSocket.cancel();
187 if (m_OnClosedCallback) {
188 m_OnClosedCallback();
206 auto self(shared_from_this());
207 m_IOService.post([
this,
self](){
208 if ((m_bStarted) && (!m_bStopped) && (m_SEPState == SEPSTATE_CONNECTED)) {
210 bool l_bDeliverSubsequentFrames =
true;
211 while ((m_ReadBufferOffset < m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
212 l_bDeliverSubsequentFrames = EvaluateReadBuffer();
215 if ((m_ReadBufferOffset == m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
238 bool SendFrame(
const Frame& a_Frame, std::function<
void()> a_OnSendDoneCallback =
nullptr) {
239 if (m_SEPState == SEPSTATE_SHUTDOWN) {
240 if (a_OnSendDoneCallback) {
241 m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
248 if (m_SendQueue.size() >= 50) {
249 if (a_OnSendDoneCallback) {
250 m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
257 m_SendQueue.emplace_back(std::make_pair(a_Frame.
Serialize(), a_OnSendDoneCallback));
258 if ((!m_bWriteInProgress) && (!m_SendQueue.empty()) && (m_SEPState == SEPSTATE_CONNECTED)) {
273 m_OnFrameCallback = a_OnFrameCallback;
283 m_OnClosedCallback = a_OnClosedCallback;
291 void ReadNextChunk() {
292 if (m_bStopped || m_bReceiving) {
297 assert(m_ReadBufferOffset == m_BytesInReadBuffer);
298 m_BytesInReadBuffer = 0;
299 m_ReadBufferOffset = 0;
300 assert(m_ReadBufferOffset == 0);
302 auto self(shared_from_this());
303 if (m_bStopped)
return;
304 m_TcpSocket.async_read_some(boost::asio::buffer(m_ReadBuffer, E_MAX_LENGTH),[
this,
self](boost::system::error_code a_ErrorCode, std::size_t a_BytesRead) {
305 if (a_ErrorCode == boost::asio::error::operation_aborted)
return;
306 if (m_bStopped)
return;
307 m_bReceiving =
false;
309 std::cerr <<
"Read error on TCP socket: " << a_ErrorCode <<
", closing" << std::endl;
313 m_BytesInReadBuffer = a_BytesRead;
328 bool EvaluateReadBuffer() {
329 bool l_bAcceptsSubsequentFrames =
true;
330 assert(m_BytesInReadBuffer);
331 if (!m_IncomingFrame) {
333 auto l_FrameFactoryIt = m_FrameFactoryMap.find(m_ReadBuffer[m_ReadBufferOffset] & m_FrameTypeMask);
334 if (l_FrameFactoryIt == m_FrameFactoryMap.end()) {
336 std::cerr <<
"Protocol violation: unknown frame type" << std::endl;
337 l_bAcceptsSubsequentFrames =
false;
341 m_IncomingFrame = l_FrameFactoryIt->second();
342 assert(m_IncomingFrame);
346 if (m_IncomingFrame) {
348 assert(m_IncomingFrame->BytesNeeded() != 0);
349 size_t l_BytesAvailable = (m_BytesInReadBuffer - m_ReadBufferOffset);
350 if (m_IncomingFrame->ParseBytes(m_ReadBuffer, m_ReadBufferOffset, l_BytesAvailable) ==
false) {
352 std::cerr <<
"Protocol violation: invalid frame content" << std::endl;
353 l_bAcceptsSubsequentFrames =
false;
357 if (m_IncomingFrame->BytesNeeded() == 0) {
359 if (m_OnFrameCallback) {
361 l_bAcceptsSubsequentFrames = m_OnFrameCallback(std::move(m_IncomingFrame));
363 m_IncomingFrame.reset();
372 return l_bAcceptsSubsequentFrames;
380 auto self(shared_from_this());
381 if (m_bStopped)
return;
382 m_bWriteInProgress =
true;
383 boost::asio::async_write(m_TcpSocket, boost::asio::buffer(&(m_SendQueue.front().first.data()[m_SendBufferOffset]), (m_SendQueue.front().first.size() - m_SendBufferOffset)),
384 [
this,
self](boost::system::error_code a_ErrorCode, std::size_t a_BytesSent) {
385 if (a_ErrorCode == boost::asio::error::operation_aborted)
return;
386 if (m_bStopped)
return;
388 m_SendBufferOffset += a_BytesSent;
389 if (m_SendBufferOffset == m_SendQueue.front().first.size()) {
391 if (m_SendQueue.front().second) {
392 m_SendQueue.front().second();
396 m_SendQueue.pop_front();
397 m_SendBufferOffset = 0;
398 if (!m_SendQueue.empty()) {
401 m_bWriteInProgress =
false;
403 m_SEPState = SEPSTATE_SHUTDOWN;
404 m_TcpSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
413 std::cerr <<
"TCP write error!" << std::endl;
420 boost::asio::io_service& m_IOService;
421 boost::asio::ip::tcp::socket m_TcpSocket;
422 uint8_t m_FrameTypeMask;
424 std::shared_ptr<Frame> m_IncomingFrame;
425 std::deque<std::pair<std::vector<unsigned char>, std::function<void()>>> m_SendQueue;
426 size_t m_SendBufferOffset;
427 bool m_bWriteInProgress;
429 enum { E_MAX_LENGTH = 65535 };
430 unsigned char m_ReadBuffer[E_MAX_LENGTH];
431 size_t m_BytesInReadBuffer;
432 size_t m_ReadBufferOffset;
436 SEPSTATE_DISCONNECTED = 0,
437 SEPSTATE_CONNECTED = 1,
438 SEPSTATE_SHUTDOWN = 2
440 E_SEPSTATE m_SEPState;
447 std::function<bool(std::shared_ptr<Frame>)> m_OnFrameCallback;
448 std::function<void()> m_OnClosedCallback;
451 std::map<uint8_t, std::function<std::shared_ptr<Frame>(void)>> m_FrameFactoryMap;
454 #endif // FRAME_ENDPOINT_H void Shutdown()
Tear a frame endpoint entity down.
void SetOnFrameCallback(std::function< bool(std::shared_ptr< Frame >)> a_OnFrameCallback)
Provide the callback method for handling of incoming frames.
void TriggerNextFrame()
Trigger delivery of the next incoming frame.
void Start()
Start the frame endpoint entity.
void Close()
Close the frame endpoint entity.
void RegisterFrameFactory(unsigned char a_FrameType, std::function< std::shared_ptr< Frame >(void)> a_FrameFactory)
Register one subsequent frame factory callback.
void ResetFrameFactories(uint8_t a_FrameTypeMask=0xFF)
Forget all provided frame factory callbacks.
This file contains the header declaration of class Frame.
FrameEndpoint(boost::asio::io_service &a_IOService, boost::asio::ip::tcp::socket &a_TcpSocket, uint8_t a_FrameTypeMask=0xFF)
The constructor of FrameEndpoint objects.
virtual const std::vector< unsigned char > Serialize() const =0
The purely virtual serializer method.
void SetOnClosedCallback(std::function< void()> a_OnClosedCallback)
Provide the callback method for handling of connection aborts.
~FrameEndpoint()
The destructor of FrameEndpoint objects.
bool GetWasStarted() const
A getter to query whether this frame endpoint entity was already started.
bool SendFrame(const Frame &a_Frame, std::function< void()> a_OnSendDoneCallback=nullptr)
Enqueue a frame for transmission.