HDLC-Daemon
FrameEndpoint.h
Go to the documentation of this file.
1 
39 #ifndef FRAME_ENDPOINT_H
40 #define FRAME_ENDPOINT_H
41 
42 #include <iostream>
43 #include <memory>
44 #include <map>
45 #include <deque>
46 #include <boost/asio.hpp>
47 #include <assert.h>
48 #include "Frame.h"
49 
65 class FrameEndpoint: public std::enable_shared_from_this<FrameEndpoint> {
66 public:
76  FrameEndpoint(boost::asio::io_service& a_IOService, boost::asio::ip::tcp::socket& a_TcpSocket, uint8_t a_FrameTypeMask = 0xFF): m_IOService(a_IOService), m_TcpSocket(std::move(a_TcpSocket)), m_FrameTypeMask(a_FrameTypeMask) {
77  // Initalize all remaining members
78  m_SEPState = SEPSTATE_DISCONNECTED;
79  m_bWriteInProgress = false;
80  m_bShutdown = false;
81  m_bStarted = false;
82  m_bStopped = false;
83  m_bReceiving = false;
84  m_BytesInReadBuffer = 0;
85  m_ReadBufferOffset = 0;
86  m_SendBufferOffset = 0;
87  }
88 
94  // Drop all callbacks and assure that Close() was called
95  m_OnFrameCallback = nullptr;
96  m_OnClosedCallback = nullptr;
97  Close();
98  }
99 
108  void ResetFrameFactories(uint8_t a_FrameTypeMask = 0xFF) {
109  // Drop all frame factories and copy the new filter mask
110  m_FrameFactoryMap.clear();
111  m_FrameTypeMask = a_FrameTypeMask;
112  }
113 
123  void RegisterFrameFactory(unsigned char a_FrameType, std::function<std::shared_ptr<Frame>(void)> a_FrameFactory) {
124  // Check that there is no frame factory for the specified frame type yet. If not then add it.
125  assert(a_FrameFactory);
126  unsigned char l_EffectiveFrameType = (a_FrameType & m_FrameTypeMask);
127  assert(m_FrameFactoryMap.find(l_EffectiveFrameType) == m_FrameFactoryMap.end());
128  m_FrameFactoryMap[l_EffectiveFrameType] = a_FrameFactory;
129  }
130 
137  bool GetWasStarted() const {
138  return m_bStarted;
139  }
140 
147  void Start() {
148  // There must be at least one frame factory available!
149  assert(m_FrameFactoryMap.empty() == false);
150  assert(m_bStarted == false);
151  assert(m_bStopped == false);
152  assert(m_SEPState == SEPSTATE_DISCONNECTED);
153  assert(m_bWriteInProgress == false);
154  assert(m_bReceiving == false);
155  m_SendBufferOffset = 0;
156  m_bStarted = true;
157  m_SEPState = SEPSTATE_CONNECTED;
158  auto self(shared_from_this());
160  if (!m_SendQueue.empty()) {
161  m_IOService.post([this, self](){ DoWrite(); });
162  } // if
163  }
164 
172  void Shutdown() {
173  m_bShutdown = true;
174  }
175 
181  void Close() {
182  if (m_bStarted && (!m_bStopped)) {
183  m_bStopped = true;
184  m_bReceiving = false;
185  m_TcpSocket.cancel();
186  m_TcpSocket.close();
187  if (m_OnClosedCallback) {
188  m_OnClosedCallback();
189  } // if
190  } // if
191  }
192 
201  // Checks
202  if (m_bReceiving) {
203  return;
204  } // if
205 
206  auto self(shared_from_this());
207  m_IOService.post([this,self](){
208  if ((m_bStarted) && (!m_bStopped) && (m_SEPState == SEPSTATE_CONNECTED)) {
209  // Consume all bytes as long as frames are consumed
210  bool l_bDeliverSubsequentFrames = true;
211  while ((m_ReadBufferOffset < m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
212  l_bDeliverSubsequentFrames = EvaluateReadBuffer();
213  } // while
214 
215  if ((m_ReadBufferOffset == m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
216  // No bytes available anymore / yet
217  ReadNextChunk();
218  } // if
219  } // if
220  }); // post
221  }
222 
238  bool SendFrame(const Frame& a_Frame, std::function<void()> a_OnSendDoneCallback = nullptr) {
239  if (m_SEPState == SEPSTATE_SHUTDOWN) {
240  if (a_OnSendDoneCallback) {
241  m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
242  } // if
243 
244  return false;
245  } // if
246 
247  // TODO: check size of the queue. If it reaches a specific limit: kill the socket to prevent DoS attacks
248  if (m_SendQueue.size() >= 50) {
249  if (a_OnSendDoneCallback) {
250  m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
251  } // if
252 
253  // TODO: check what happens if this is caused by an important packet, e.g., a keep alive or an echo response packet
254  return false;
255  } // if
256 
257  m_SendQueue.emplace_back(std::make_pair(a_Frame.Serialize(), a_OnSendDoneCallback));
258  if ((!m_bWriteInProgress) && (!m_SendQueue.empty()) && (m_SEPState == SEPSTATE_CONNECTED)) {
259  DoWrite();
260  } // if
261 
262  return true;
263  }
264 
272  void SetOnFrameCallback(std::function<bool(std::shared_ptr<Frame>)> a_OnFrameCallback) {
273  m_OnFrameCallback = a_OnFrameCallback;
274  }
275 
282  void SetOnClosedCallback(std::function<void()> a_OnClosedCallback) {
283  m_OnClosedCallback = a_OnClosedCallback;
284  }
285 
286 private:
291  void ReadNextChunk() {
292  if (m_bStopped || m_bReceiving) {
293  // Already stopped / a later trigger will happen
294  return;
295  } // if
296 
297  assert(m_ReadBufferOffset == m_BytesInReadBuffer);
298  m_BytesInReadBuffer = 0;
299  m_ReadBufferOffset = 0;
300  assert(m_ReadBufferOffset == 0);
301  m_bReceiving = true;
302  auto self(shared_from_this());
303  if (m_bStopped) return;
304  m_TcpSocket.async_read_some(boost::asio::buffer(m_ReadBuffer, E_MAX_LENGTH),[this, self](boost::system::error_code a_ErrorCode, std::size_t a_BytesRead) {
305  if (a_ErrorCode == boost::asio::error::operation_aborted) return;
306  if (m_bStopped) return;
307  m_bReceiving = false;
308  if (a_ErrorCode) {
309  std::cerr << "Read error on TCP socket: " << a_ErrorCode << ", closing" << std::endl;
310  Close();
311  } else {
312  // Evaluate the bytes at hand
313  m_BytesInReadBuffer = a_BytesRead;
315  } // else
316  }); // async_read_some
317  }
318 
328  bool EvaluateReadBuffer() {
329  bool l_bAcceptsSubsequentFrames = true;
330  assert(m_BytesInReadBuffer);
331  if (!m_IncomingFrame) {
332  // No frame is waiting yet... create it
333  auto l_FrameFactoryIt = m_FrameFactoryMap.find(m_ReadBuffer[m_ReadBufferOffset] & m_FrameTypeMask);
334  if (l_FrameFactoryIt == m_FrameFactoryMap.end()) {
335  // Error, no suitable frame factory available!
336  std::cerr << "Protocol violation: unknown frame type" << std::endl;
337  l_bAcceptsSubsequentFrames = false;
338  Close();
339  } else {
340  // Create new frame
341  m_IncomingFrame = l_FrameFactoryIt->second();
342  assert(m_IncomingFrame);
343  } // else
344  } // else
345 
346  if (m_IncomingFrame) {
347  // Feed the waiting frame with data
348  assert(m_IncomingFrame->BytesNeeded() != 0);
349  size_t l_BytesAvailable = (m_BytesInReadBuffer - m_ReadBufferOffset);
350  if (m_IncomingFrame->ParseBytes(m_ReadBuffer, m_ReadBufferOffset, l_BytesAvailable) == false) {
351  // Parser error
352  std::cerr << "Protocol violation: invalid frame content" << std::endl;
353  l_bAcceptsSubsequentFrames = false;
354  Close();
355  } else {
356  // No error while parsing
357  if (m_IncomingFrame->BytesNeeded() == 0) {
358  // The frame is complete
359  if (m_OnFrameCallback) {
360  // Deliver the data packet but maybe stall the receiver
361  l_bAcceptsSubsequentFrames = m_OnFrameCallback(std::move(m_IncomingFrame));
362  } else {
363  m_IncomingFrame.reset();
364  } // else
365  } else {
366  // The frame is not complete yet
367  ReadNextChunk();
368  } // else
369  } // else
370  } // if
371 
372  return l_bAcceptsSubsequentFrames;
373  }
374 
379  void DoWrite() {
380  auto self(shared_from_this());
381  if (m_bStopped) return;
382  m_bWriteInProgress = true;
383  boost::asio::async_write(m_TcpSocket, boost::asio::buffer(&(m_SendQueue.front().first.data()[m_SendBufferOffset]), (m_SendQueue.front().first.size() - m_SendBufferOffset)),
384  [this, self](boost::system::error_code a_ErrorCode, std::size_t a_BytesSent) {
385  if (a_ErrorCode == boost::asio::error::operation_aborted) return;
386  if (m_bStopped) return;
387  if (!a_ErrorCode) {
388  m_SendBufferOffset += a_BytesSent;
389  if (m_SendBufferOffset == m_SendQueue.front().first.size()) {
390  // Completed transmission. If a callback was provided, call it now to demand for a subsequent packet
391  if (m_SendQueue.front().second) {
392  m_SendQueue.front().second();
393  } // if
394 
395  // Remove transmitted packet
396  m_SendQueue.pop_front();
397  m_SendBufferOffset = 0;
398  if (!m_SendQueue.empty()) {
399  DoWrite();
400  } else {
401  m_bWriteInProgress = false;
402  if (m_bShutdown) {
403  m_SEPState = SEPSTATE_SHUTDOWN;
404  m_TcpSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
405  Close();
406  } // if
407  } // else
408  } else {
409  // Only a partial transmission. We are not done yet.
410  DoWrite();
411  } // else
412  } else {
413  std::cerr << "TCP write error!" << std::endl;
414  Close();
415  } // else
416  }); // async_write
417  }
418 
419  // Members
420  boost::asio::io_service& m_IOService;
421  boost::asio::ip::tcp::socket m_TcpSocket;
422  uint8_t m_FrameTypeMask;
423 
424  std::shared_ptr<Frame> m_IncomingFrame;
425  std::deque<std::pair<std::vector<unsigned char>, std::function<void()>>> m_SendQueue;
426  size_t m_SendBufferOffset;
427  bool m_bWriteInProgress;
428 
429  enum { E_MAX_LENGTH = 65535 };
430  unsigned char m_ReadBuffer[E_MAX_LENGTH];
431  size_t m_BytesInReadBuffer;
432  size_t m_ReadBufferOffset;
433 
434  // State
435  typedef enum {
436  SEPSTATE_DISCONNECTED = 0,
437  SEPSTATE_CONNECTED = 1,
438  SEPSTATE_SHUTDOWN = 2
439  } E_SEPSTATE;
440  E_SEPSTATE m_SEPState;
441  bool m_bShutdown;
442  bool m_bStarted;
443  bool m_bStopped;
444  bool m_bReceiving;
445 
446  // Callbacks
447  std::function<bool(std::shared_ptr<Frame>)> m_OnFrameCallback;
448  std::function<void()> m_OnClosedCallback;
449 
450  // The frame factories
451  std::map<uint8_t, std::function<std::shared_ptr<Frame>(void)>> m_FrameFactoryMap;
452 };
453 
454 #endif // FRAME_ENDPOINT_H
void Shutdown()
Tear a frame endpoint entity down.
void SetOnFrameCallback(std::function< bool(std::shared_ptr< Frame >)> a_OnFrameCallback)
Provide the callback method for handling of incoming frames.
void TriggerNextFrame()
Trigger delivery of the next incoming frame.
void Start()
Start the frame endpoint entity.
void Close()
Close the frame endpoint entity.
Class FrameEndpoint.
Definition: FrameEndpoint.h:65
void RegisterFrameFactory(unsigned char a_FrameType, std::function< std::shared_ptr< Frame >(void)> a_FrameFactory)
Register one subsequent frame factory callback.
void ResetFrameFactories(uint8_t a_FrameTypeMask=0xFF)
Forget all provided frame factory callbacks.
This file contains the header declaration of class Frame.
FrameEndpoint(boost::asio::io_service &a_IOService, boost::asio::ip::tcp::socket &a_TcpSocket, uint8_t a_FrameTypeMask=0xFF)
The constructor of FrameEndpoint objects.
Definition: FrameEndpoint.h:76
virtual const std::vector< unsigned char > Serialize() const =0
The purely virtual serializer method.
void SetOnClosedCallback(std::function< void()> a_OnClosedCallback)
Provide the callback method for handling of connection aborts.
~FrameEndpoint()
The destructor of FrameEndpoint objects.
Definition: FrameEndpoint.h:93
bool GetWasStarted() const
A getter to query whether this frame endpoint entity was already started.
Class Frame.
Definition: Frame.h:59
bool SendFrame(const Frame &a_Frame, std::function< void()> a_OnSendDoneCallback=nullptr)
Enqueue a frame for transmission.