37 #ifndef FRAME_ENDPOINT_H 38 #define FRAME_ENDPOINT_H 44 #include <boost/asio.hpp> 48 class FrameEndpoint:
public std::enable_shared_from_this<FrameEndpoint> {
51 FrameEndpoint(boost::asio::io_service& a_IOService, boost::asio::ip::tcp::socket& a_TcpSocket, uint8_t a_FrameTypeMask = 0xFF): m_IOService(a_IOService), m_TcpSocket(std::move(a_TcpSocket)), m_FrameTypeMask(a_FrameTypeMask) {
53 m_SEPState = SEPSTATE_DISCONNECTED;
54 m_bWriteInProgress =
false;
59 m_BytesInReadBuffer = 0;
60 m_ReadBufferOffset = 0;
61 m_SendBufferOffset = 0;
66 m_OnFrameCallback =
nullptr;
67 m_OnClosedCallback =
nullptr;
73 m_FrameFactoryMap.clear();
74 m_FrameTypeMask = a_FrameTypeMask;
77 void RegisterFrameFactory(
unsigned char a_FrameType, std::function<std::shared_ptr<Frame>(
void)> a_FrameFactory) {
79 assert(a_FrameFactory);
80 unsigned char l_EffectiveFrameType = (a_FrameType & m_FrameTypeMask);
81 assert(m_FrameFactoryMap.find(l_EffectiveFrameType) == m_FrameFactoryMap.end());
82 m_FrameFactoryMap[l_EffectiveFrameType] = a_FrameFactory;
91 assert(m_FrameFactoryMap.empty() ==
false);
92 assert(m_bStarted ==
false);
93 assert(m_bStopped ==
false);
94 assert(m_SEPState == SEPSTATE_DISCONNECTED);
95 assert(m_bWriteInProgress ==
false);
96 assert(m_bReceiving ==
false);
97 m_SendBufferOffset = 0;
99 m_SEPState = SEPSTATE_CONNECTED;
100 auto self(shared_from_this());
102 if (!m_SendQueue.empty()) {
103 m_IOService.post([
this,
self](){ DoWrite(); });
112 if (m_bStarted && (!m_bStopped)) {
114 m_bReceiving =
false;
115 m_TcpSocket.cancel();
117 if (m_OnClosedCallback) {
118 m_OnClosedCallback();
129 auto self(shared_from_this());
130 m_IOService.post([
this,
self](){
131 if ((m_bStarted) && (!m_bStopped) && (m_SEPState == SEPSTATE_CONNECTED)) {
133 bool l_bDeliverSubsequentFrames =
true;
134 while ((m_ReadBufferOffset < m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
135 l_bDeliverSubsequentFrames = EvaluateReadBuffer();
138 if ((m_ReadBufferOffset == m_BytesInReadBuffer) && (l_bDeliverSubsequentFrames) && (!m_bStopped)) {
146 bool SendFrame(
const Frame& a_Frame, std::function<
void()> a_OnSendDoneCallback =
nullptr) {
147 if (m_SEPState == SEPSTATE_SHUTDOWN) {
148 if (a_OnSendDoneCallback) {
149 m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
156 if (m_SendQueue.size() >= 50) {
157 if (a_OnSendDoneCallback) {
158 m_IOService.post([a_OnSendDoneCallback](){ a_OnSendDoneCallback(); });
165 m_SendQueue.emplace_back(std::make_pair(a_Frame.
Serialize(), a_OnSendDoneCallback));
166 if ((!m_bWriteInProgress) && (!m_SendQueue.empty()) && (m_SEPState == SEPSTATE_CONNECTED)) {
174 m_OnFrameCallback = a_OnFrameCallback;
178 m_OnClosedCallback = a_OnClosedCallback;
182 void ReadNextChunk() {
183 if (m_bStopped || m_bReceiving) {
188 assert(m_ReadBufferOffset == m_BytesInReadBuffer);
189 m_BytesInReadBuffer = 0;
190 m_ReadBufferOffset = 0;
191 assert(m_ReadBufferOffset == 0);
193 auto self(shared_from_this());
194 if (m_bStopped)
return;
195 m_TcpSocket.async_read_some(boost::asio::buffer(m_ReadBuffer, E_MAX_LENGTH),[
this,
self](boost::system::error_code a_ErrorCode, std::size_t a_BytesRead) {
196 if (a_ErrorCode == boost::asio::error::operation_aborted)
return;
197 if (m_bStopped)
return;
198 m_bReceiving =
false;
200 std::cerr <<
"Read error on TCP socket: " << a_ErrorCode <<
", closing" << std::endl;
204 m_BytesInReadBuffer = a_BytesRead;
210 bool EvaluateReadBuffer() {
211 bool l_bAcceptsSubsequentFrames =
true;
212 assert(m_BytesInReadBuffer);
213 if (!m_IncomingFrame) {
215 auto l_FrameFactoryIt = m_FrameFactoryMap.find(m_ReadBuffer[m_ReadBufferOffset] & m_FrameTypeMask);
216 if (l_FrameFactoryIt == m_FrameFactoryMap.end()) {
218 std::cerr <<
"Protocol violation: unknown frame type" << std::endl;
219 l_bAcceptsSubsequentFrames =
false;
223 m_IncomingFrame = l_FrameFactoryIt->second();
224 assert(m_IncomingFrame);
228 if (m_IncomingFrame) {
230 assert(m_IncomingFrame->BytesNeeded() != 0);
231 size_t l_BytesAvailable = (m_BytesInReadBuffer - m_ReadBufferOffset);
232 if (m_IncomingFrame->ParseBytes(m_ReadBuffer, m_ReadBufferOffset, l_BytesAvailable) ==
false) {
234 std::cerr <<
"Protocol violation: invalid frame content" << std::endl;
235 l_bAcceptsSubsequentFrames =
false;
239 if (m_IncomingFrame->BytesNeeded() == 0) {
241 if (m_OnFrameCallback) {
243 l_bAcceptsSubsequentFrames = m_OnFrameCallback(std::move(m_IncomingFrame));
245 m_IncomingFrame.reset();
254 return l_bAcceptsSubsequentFrames;
258 auto self(shared_from_this());
259 if (m_bStopped)
return;
260 m_bWriteInProgress =
true;
261 boost::asio::async_write(m_TcpSocket, boost::asio::buffer(&(m_SendQueue.front().first.data()[m_SendBufferOffset]), (m_SendQueue.front().first.size() - m_SendBufferOffset)),
262 [
this,
self](boost::system::error_code a_ErrorCode, std::size_t a_BytesSent) {
263 if (a_ErrorCode == boost::asio::error::operation_aborted)
return;
264 if (m_bStopped)
return;
266 m_SendBufferOffset += a_BytesSent;
267 if (m_SendBufferOffset == m_SendQueue.front().first.size()) {
269 if (m_SendQueue.front().second) {
270 m_SendQueue.front().second();
274 m_SendQueue.pop_front();
275 m_SendBufferOffset = 0;
276 if (!m_SendQueue.empty()) {
279 m_bWriteInProgress =
false;
281 m_SEPState = SEPSTATE_SHUTDOWN;
282 m_TcpSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
291 std::cerr <<
"TCP write error!" << std::endl;
298 boost::asio::io_service& m_IOService;
299 boost::asio::ip::tcp::socket m_TcpSocket;
300 uint8_t m_FrameTypeMask;
302 std::shared_ptr<Frame> m_IncomingFrame;
303 std::deque<std::pair<std::vector<unsigned char>, std::function<void()>>> m_SendQueue;
304 size_t m_SendBufferOffset;
305 bool m_bWriteInProgress;
307 enum { E_MAX_LENGTH = 65535 };
308 unsigned char m_ReadBuffer[E_MAX_LENGTH];
309 size_t m_BytesInReadBuffer;
310 size_t m_ReadBufferOffset;
314 SEPSTATE_DISCONNECTED = 0,
315 SEPSTATE_CONNECTED = 1,
316 SEPSTATE_SHUTDOWN = 2
318 E_SEPSTATE m_SEPState;
325 std::function<bool(std::shared_ptr<Frame>)> m_OnFrameCallback;
326 std::function<void()> m_OnClosedCallback;
329 std::map<uint8_t, std::function<std::shared_ptr<Frame>(void)>> m_FrameFactoryMap;
332 #endif // FRAME_ENDPOINT_H
void SetOnFrameCallback(std::function< bool(std::shared_ptr< Frame >)> a_OnFrameCallback)
void RegisterFrameFactory(unsigned char a_FrameType, std::function< std::shared_ptr< Frame >(void)> a_FrameFactory)
void ResetFrameFactories(uint8_t a_FrameTypeMask=0xFF)
Copyright (c) 2016, Florian Evers, florian-evers@gmx.de All rights reserved.
FrameEndpoint(boost::asio::io_service &a_IOService, boost::asio::ip::tcp::socket &a_TcpSocket, uint8_t a_FrameTypeMask=0xFF)
virtual const std::vector< unsigned char > Serialize() const =0
void SetOnClosedCallback(std::function< void()> a_OnClosedCallback)
bool GetWasStarted() const
bool SendFrame(const Frame &a_Frame, std::function< void()> a_OnSendDoneCallback=nullptr)