s-net-Tools
FrameEndpoint.h
Go to the documentation of this file.
1 
37 #ifndef FRAME_ENDPOINT_H
38 #define FRAME_ENDPOINT_H
39 
40 #include <iostream>
41 #include <memory>
42 #include <map>
43 #include <deque>
44 #include <boost/asio.hpp>
45 #include <assert.h>
46 #include "Frame.h"
47 
48 class FrameEndpoint: public std::enable_shared_from_this<FrameEndpoint> {
49 public:
50  // CTOR and DTOR
51  FrameEndpoint(boost::asio::io_service& a_IOService, boost::asio::ip::tcp::socket& a_TcpSocket, uint8_t a_FrameTypeMask = 0xFF): m_IOService(a_IOService), m_TcpSocket(std::move(a_TcpSocket)), m_FrameTypeMask(a_FrameTypeMask) {
52  // Initalize all remaining members
53  m_SEPState = SEPSTATE_DISCONNECTED;
54  m_bWriteInProgress = false;
55  m_bShutdown = false;
56  m_bStarted = false;
57  m_bStopped = false;
58  m_bReceiving = false;
59  m_BytesInReadBuffer = 0;
60  m_ReadBufferOffset = 0;
61  m_SendBufferOffset = 0;
62  }
63 
65  // Drop all callbacks and assure that close was called
66  m_OnFrameCallback = nullptr;
67  m_OnClosedCallback = nullptr;
68  Close();
69  }
70 
71  void ResetFrameFactories(uint8_t a_FrameTypeMask = 0xFF) {
72  // Drop all frame factories and copy the new filter mask
73  m_FrameFactoryMap.clear();
74  m_FrameTypeMask = a_FrameTypeMask;
75  }
76 
77  void RegisterFrameFactory(unsigned char a_FrameType, std::function<std::shared_ptr<Frame>(void)> a_FrameFactory) {
78  // Check that there is no frame factory for the specified frame type yet. If not then add it.
79  assert(a_FrameFactory);
80  unsigned char l_EffectiveFrameType = (a_FrameType & m_FrameTypeMask);
81  assert(m_FrameFactoryMap.find(l_EffectiveFrameType) == m_FrameFactoryMap.end());
82  m_FrameFactoryMap[l_EffectiveFrameType] = a_FrameFactory;
83  }
84 
85  bool GetWasStarted() const {
86  return m_bStarted;
87  }
88 
89  void Start() {
90  // There must be at least one frame factory available!
91  assert(m_FrameFactoryMap.empty() == false);
92  assert(m_bStarted == false);
93  assert(m_bStopped == false);
94  assert(m_SEPState == SEPSTATE_DISCONNECTED);
95  assert(m_bWriteInProgress == false);
96  assert(m_bReceiving == false);
97  m_SendBufferOffset = 0;
98  m_bStarted = true;
99  m_SEPState = SEPSTATE_CONNECTED;
100  auto self(shared_from_this());
102  if (!m_SendQueue.empty()) {
103  m_IOService.post([this, self](){ DoWrite(); });
104  } // if
105  }
106 
107  void Shutdown() {
108  m_bShutdown = true;
109  }
110 
111  void Close() {
112  if (m_bStarted && (!m_bStopped)) {
113  m_bStopped = true;
114  m_bReceiving = false;
115  m_TcpSocket.cancel();
116  m_TcpSocket.close();
117  if (m_OnClosedCallback) {
118  m_OnClosedCallback();
119  } // if
120  } // if
121  }
122 
124  // Checks
125  if (m_bReceiving) {
126  return;
127  } // if
128 
129  auto self(shared_from_this());
130  m_IOService.post([this,self](){
131  if ((m_bStarted) && (!m_bStopped) && (m_SEPState == SEPSTATE_CONNECTED)) {
132  // Consume all bytes as long as frames are consumed
133  bool l_bDeliverSubsequentFrames = true;
134  while ((m_ReadBufferOffset < m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
135  l_bDeliverSubsequentFrames = EvaluateReadBuffer();
136  } // while
137 
138  if ((m_ReadBufferOffset == m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
139  // No bytes available anymore / yet
140  ReadNextChunk();
141  } // if
142  } // if
143  }); // post
144  }
145 
146  bool SendFrame(const Frame& a_Frame, std::function<void()> a_OnSendDoneCallback = nullptr) {
147  if (m_SEPState == SEPSTATE_SHUTDOWN) {
148  if (a_OnSendDoneCallback) {
149  m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
150  } // if
151 
152  return false;
153  } // if
154 
155  // TODO: check size of the queue. If it reaches a specific limit: kill the socket to prevent DoS attacks
156  if (m_SendQueue.size() >= 50) {
157  if (a_OnSendDoneCallback) {
158  m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
159  } // if
160 
161  // TODO: check what happens if this is caused by an important packet, e.g., a keep alive or an echo response packet
162  return false;
163  } // if
164 
165  m_SendQueue.emplace_back(std::make_pair(a_Frame.Serialize(), a_OnSendDoneCallback));
166  if ((!m_bWriteInProgress) && (!m_SendQueue.empty()) && (m_SEPState == SEPSTATE_CONNECTED)) {
167  DoWrite();
168  } // if
169 
170  return true;
171  }
172 
173  void SetOnFrameCallback(std::function<bool(std::shared_ptr<Frame>)> a_OnFrameCallback) {
174  m_OnFrameCallback = a_OnFrameCallback;
175  }
176 
177  void SetOnClosedCallback(std::function<void()> a_OnClosedCallback) {
178  m_OnClosedCallback = a_OnClosedCallback;
179  }
180 
181 private:
182  void ReadNextChunk() {
183  if (m_bStopped || m_bReceiving) {
184  // Already stopped / a later trigger will happen
185  return;
186  } // if
187 
188  assert(m_ReadBufferOffset == m_BytesInReadBuffer);
189  m_BytesInReadBuffer = 0;
190  m_ReadBufferOffset = 0;
191  assert(m_ReadBufferOffset == 0);
192  m_bReceiving = true;
193  auto self(shared_from_this());
194  if (m_bStopped) return;
195  m_TcpSocket.async_read_some(boost::asio::buffer(m_ReadBuffer, E_MAX_LENGTH),[this, self](boost::system::error_code a_ErrorCode, std::size_t a_BytesRead) {
196  if (a_ErrorCode == boost::asio::error::operation_aborted) return;
197  if (m_bStopped) return;
198  m_bReceiving = false;
199  if (a_ErrorCode) {
200  std::cerr << "Read error on TCP socket: " << a_ErrorCode << ", closing" << std::endl;
201  Close();
202  } else {
203  // Evaluate the bytes at hand
204  m_BytesInReadBuffer = a_BytesRead;
206  } // else
207  }); // async_read_some
208  }
209 
210  bool EvaluateReadBuffer() {
211  bool l_bAcceptsSubsequentFrames = true;
212  assert(m_BytesInReadBuffer);
213  if (!m_IncomingFrame) {
214  // No frame is waiting yet... create it
215  auto l_FrameFactoryIt = m_FrameFactoryMap.find(m_ReadBuffer[m_ReadBufferOffset] & m_FrameTypeMask);
216  if (l_FrameFactoryIt == m_FrameFactoryMap.end()) {
217  // Error, no suitable frame factory available!
218  std::cerr << "Protocol violation: unknown frame type" << std::endl;
219  l_bAcceptsSubsequentFrames = false;
220  Close();
221  } else {
222  // Create new frame
223  m_IncomingFrame = l_FrameFactoryIt->second();
224  assert(m_IncomingFrame);
225  } // else
226  } // else
227 
228  if (m_IncomingFrame) {
229  // Feed the waiting frame with data
230  assert(m_IncomingFrame->BytesNeeded() != 0);
231  size_t l_BytesAvailable = (m_BytesInReadBuffer - m_ReadBufferOffset);
232  if (m_IncomingFrame->ParseBytes(m_ReadBuffer, m_ReadBufferOffset, l_BytesAvailable) == false) {
233  // Parser error
234  std::cerr << "Protocol violation: invalid frame content" << std::endl;
235  l_bAcceptsSubsequentFrames = false;
236  Close();
237  } else {
238  // No error while parsing
239  if (m_IncomingFrame->BytesNeeded() == 0) {
240  // The frame is complete
241  if (m_OnFrameCallback) {
242  // Deliver the data packet but maybe stall the receiver
243  l_bAcceptsSubsequentFrames = m_OnFrameCallback(std::move(m_IncomingFrame));
244  } else {
245  m_IncomingFrame.reset();
246  } // else
247  } else {
248  // The frame is not complete yet
249  ReadNextChunk();
250  } // else
251  } // else
252  } // if
253 
254  return l_bAcceptsSubsequentFrames;
255  }
256 
257  void DoWrite() {
258  auto self(shared_from_this());
259  if (m_bStopped) return;
260  m_bWriteInProgress = true;
261  boost::asio::async_write(m_TcpSocket, boost::asio::buffer(&(m_SendQueue.front().first.data()[m_SendBufferOffset]), (m_SendQueue.front().first.size() - m_SendBufferOffset)),
262  [this, self](boost::system::error_code a_ErrorCode, std::size_t a_BytesSent) {
263  if (a_ErrorCode == boost::asio::error::operation_aborted) return;
264  if (m_bStopped) return;
265  if (!a_ErrorCode) {
266  m_SendBufferOffset += a_BytesSent;
267  if (m_SendBufferOffset == m_SendQueue.front().first.size()) {
268  // Completed transmission. If a callback was provided, call it now to demand for a subsequent packet
269  if (m_SendQueue.front().second) {
270  m_SendQueue.front().second();
271  } // if
272 
273  // Remove transmitted packet
274  m_SendQueue.pop_front();
275  m_SendBufferOffset = 0;
276  if (!m_SendQueue.empty()) {
277  DoWrite();
278  } else {
279  m_bWriteInProgress = false;
280  if (m_bShutdown) {
281  m_SEPState = SEPSTATE_SHUTDOWN;
282  m_TcpSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
283  Close();
284  } // if
285  } // else
286  } else {
287  // Only a partial transmission. We are not done yet.
288  DoWrite();
289  } // else
290  } else {
291  std::cerr << "TCP write error!" << std::endl;
292  Close();
293  } // else
294  }); // async_write
295  }
296 
297  // Members
298  boost::asio::io_service& m_IOService;
299  boost::asio::ip::tcp::socket m_TcpSocket;
300  uint8_t m_FrameTypeMask;
301 
302  std::shared_ptr<Frame> m_IncomingFrame;
303  std::deque<std::pair<std::vector<unsigned char>, std::function<void()>>> m_SendQueue; // To be transmitted
304  size_t m_SendBufferOffset;
305  bool m_bWriteInProgress;
306 
307  enum { E_MAX_LENGTH = 65535 };
308  unsigned char m_ReadBuffer[E_MAX_LENGTH];
309  size_t m_BytesInReadBuffer;
310  size_t m_ReadBufferOffset;
311 
312  // State
313  typedef enum {
314  SEPSTATE_DISCONNECTED = 0,
315  SEPSTATE_CONNECTED = 1,
316  SEPSTATE_SHUTDOWN = 2
317  } E_SEPSTATE;
318  E_SEPSTATE m_SEPState;
319  bool m_bShutdown;
320  bool m_bStarted;
321  bool m_bStopped;
322  bool m_bReceiving;
323 
324  // Callbacks
325  std::function<bool(std::shared_ptr<Frame>)> m_OnFrameCallback;
326  std::function<void()> m_OnClosedCallback;
327 
328  // The frame factories
329  std::map<uint8_t, std::function<std::shared_ptr<Frame>(void)>> m_FrameFactoryMap;
330 };
331 
332 #endif // FRAME_ENDPOINT_H
void SetOnFrameCallback(std::function< bool(std::shared_ptr< Frame >)> a_OnFrameCallback)
void TriggerNextFrame()
void RegisterFrameFactory(unsigned char a_FrameType, std::function< std::shared_ptr< Frame >(void)> a_FrameFactory)
Definition: FrameEndpoint.h:77
void ResetFrameFactories(uint8_t a_FrameTypeMask=0xFF)
Definition: FrameEndpoint.h:71
Copyright (c) 2016, Florian Evers, florian-evers@gmx.de All rights reserved.
FrameEndpoint(boost::asio::io_service &a_IOService, boost::asio::ip::tcp::socket &a_TcpSocket, uint8_t a_FrameTypeMask=0xFF)
Definition: FrameEndpoint.h:51
virtual const std::vector< unsigned char > Serialize() const =0
void SetOnClosedCallback(std::function< void()> a_OnClosedCallback)
bool GetWasStarted() const
Definition: FrameEndpoint.h:85
Definition: Frame.h:44
bool SendFrame(const Frame &a_Frame, std::function< void()> a_OnSendDoneCallback=nullptr)