diff --git a/asyncmsg/Session.h b/asyncmsg/Session.h index a4a5f44b895cb5f2546f76e2095b14f226ed3ba2..3f981d718075368e575c3e57ee649a2b53e22b44 100644 --- a/asyncmsg/Session.h +++ b/asyncmsg/Session.h @@ -37,7 +37,8 @@ public: enum class State { CLOSED, OPEN_PENDING, - OPEN + OPEN, + CLOSE_PENDING }; /*! \brief Defines the upper limit for message type identifiers. @@ -190,13 +191,31 @@ protected: public: - /*! \brief Closes the session. + /*! \brief Initiates an asynchronous close. * - * This method immediately closes the session. Pending asynchronous operations will be canceled - * and the corresponding "failed" completion methods will be invoked with the - * boost::asio::error::operation_aborted error. + * Asynchronously closes the session. Pending asynchronous operations will be canceled and the + * corresponding "failed" completion methods will be invoked with the + * boost::asio::error::operation_aborted error. When all pending completion methods have been + * executed, onClose() will be invoked. If an error occurs, onCloseError() will be invoked + * instead. */ - void close(); + void asyncClose(); + +protected: + + /*! \brief Completion method for successful asyncClose(). + * + * This method is called when a close operation completes successfully. + */ + virtual void onClose() noexcept = 0; + + /*! \brief Completion method for failed asyncClose(). + * + * This method is called when an error occurs during a close operation. + * + * \param[in] error Indicates what error occurred. + */ + virtual void onCloseError(const boost::system::error_code& error) noexcept = 0; public: @@ -225,6 +244,8 @@ private: friend class Server; void startOpen(const std::shared_ptr<Session>& self); + void abortOpen(); + void checkClose(); void startReceive(const std::shared_ptr<Session>& self); void startSend(const std::shared_ptr<Session>& self); diff --git a/src/Session.cxx b/src/Session.cxx index 36d0b6c6378bf1178f54765db7976bcc6d463b08..234b5d740c565047fae25557589daebc31ae4fa7 100644 --- a/src/Session.cxx +++ b/src/Session.cxx @@ -2,6 +2,7 @@ #include "asyncmsg/Error.h" +#include <boost/asio/error.hpp> #include <boost/asio/read.hpp> #include <boost/asio/write.hpp> #include <boost/range/iterator_range.hpp> @@ -25,7 +26,6 @@ Session::Session(boost::asio::io_service& ioService) : Session::~Session() { - close(); } void Session::asyncOpen(const std::string& localName, @@ -47,8 +47,8 @@ void Session::asyncOpen(const std::string& localName, m_socket.async_connect(remoteEndpoint, m_strand.wrap( [this, self] (const boost::system::error_code& error) { if (error) { - close(); onOpenError(error); + abortOpen(); } else { startOpen(self); @@ -73,7 +73,7 @@ void Session::startOpen(const std::shared_ptr<Session>& self) [this, self] (const boost::system::error_code& error, std::size_t) { if (error) { onOpenError(error); - close(); + abortOpen(); return; } // 3. Receive the HELLO message header @@ -81,12 +81,12 @@ void Session::startOpen(const std::shared_ptr<Session>& self) [this, self] (const boost::system::error_code& error, std::size_t) { if (error) { onOpenError(error); - close(); + abortOpen(); return; } if (m_recvHeader.typeId() != HELLO_MESSAGE_ID) { onOpenError(Error::UNEXPECTED_MESSAGE_TYPE); - close(); + abortOpen(); return; } // 4. Receive the HELLO message body @@ -95,7 +95,7 @@ void Session::startOpen(const std::shared_ptr<Session>& self) [this, self, storage] (const boost::system::error_code& error, std::size_t) { if (error) { onOpenError(error); - close(); + abortOpen(); return; } m_remoteName.insert(m_remoteName.begin(), storage->begin(), storage->end()); @@ -106,18 +106,59 @@ void Session::startOpen(const std::shared_ptr<Session>& self) })); } -void Session::close() +void Session::abortOpen() +{ + // abortOpen() is called if there was an error in one of the I/O operations in asyncOpen() or + // startOpen(). We have to distinguish two cases: + // 1) The error occurred because asyncClose() closed m_socket + // 2) There was a "real" I/O error + + if (m_state == State::CLOSE_PENDING) { + // case 1) + onClose(); + } + else { + // case 2) + boost::system::error_code ignoredError; + m_socket.close(ignoredError); + } + m_localName.clear(); + m_remoteName.clear(); + m_state = State::CLOSED; +} + + +void Session::asyncClose() { - m_socket.close(); auto self = shared_from_this(); m_strand.dispatch( [this, self] () { - m_localName.clear(); - m_remoteName.clear(); - m_state = State::CLOSED; + if (m_state != State::OPEN && m_state != State::OPEN_PENDING) { + // use post to avoid stack overflows if the user calls asyncClose again from onCloseError() + m_strand.post( + [this, self] () { + onCloseError(Error::SESSION_CLOSED); + }); + } + else { + m_state = State::CLOSE_PENDING; + boost::system::error_code ignoredError; + m_socket.close(ignoredError); + checkClose(); + } }); } +void Session::checkClose() +{ + if (m_state == State::CLOSE_PENDING && m_recvNPending == 0 && m_sendQueue.empty()) { + onClose(); + m_localName.clear(); + m_remoteName.clear(); + m_state = State::CLOSED; + } +} + void Session::asyncReceive() { auto self = shared_from_this(); @@ -164,6 +205,9 @@ void Session::startReceive(const std::shared_ptr<Session>& self) if (--m_recvNPending != 0) { startReceive(self); } + else { + checkClose(); + } })); return; } @@ -178,6 +222,9 @@ void Session::startReceive(const std::shared_ptr<Session>& self) if (--m_recvNPending != 0) { startReceive(self); } + else { + checkClose(); + } })); return; } @@ -185,6 +232,9 @@ void Session::startReceive(const std::shared_ptr<Session>& self) if (--m_recvNPending != 0) { startReceive(self); } + else { + checkClose(); + } })); } @@ -251,6 +301,9 @@ void Session::startSend(const std::shared_ptr<Session>& self) // there are outstanding messages to send: start a new send operation startSend(self); } + else { + checkClose(); + } })); }