From a7802d11e3d10ee11251fee781d04403de68fcf5 Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Fri, 20 Sep 2024 23:42:14 +0200 Subject: [PATCH] Discard data on new TCP connection --- os/src/userspace/data_server.hpp | 93 ++++++++++++++++++++++++-------- os/src/userspace/puzzlecmd.cpp | 3 +- os/src/userspace/puzzlefw.hpp | 30 +++++++++-- 3 files changed, 98 insertions(+), 28 deletions(-) diff --git a/os/src/userspace/data_server.hpp b/os/src/userspace/data_server.hpp index 956a513..666dd7a 100644 --- a/os/src/userspace/data_server.hpp +++ b/os/src/userspace/data_server.hpp @@ -74,11 +74,19 @@ public: /** Destructor. */ ~DataServer() { + // Disable DMA stream and clear buffer. + m_dma_stream.init(); + // Disable interrupts. m_dma_stream.disable_interrupt(); } - /** Start the server. */ + /** + * Start the server. + * + * This method must not be called directly while a multi-threaded Asio + * event loop is running. In that case, use "async_start_server()". + */ void start_server() { // If the server is already open, close and re-open it. @@ -109,9 +117,17 @@ public: log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port); } - /** Stop the server and close current connections. */ + /** + * Stop the server and close current connections. + * + * This method must not be called directly while a multi-threaded Asio + * event loop is running. In that case, use "async_stop_server()". + */ void stop_server() { + // Disable DMA stream and clear buffer. + m_dma_stream.init(); + // Stop accepting connections. if (m_acceptor.is_open()) { log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port); @@ -127,6 +143,46 @@ public: } } + /** + * Submit a call to "start_server()" to be executed in the strand + * of this server instance. + * + * After starting the server, the specified completion handler will + * be submitted for execution. + * + * This method may safely be called from any thread. + */ + template + void async_start_server(Handler&& handler) + { + asio::post( + m_strand, + [this, handler] () mutable { + start_server(); + asio::post(m_strand, handler); + }); + } + + /** + * Submit a call to "stop_server()" to be executed in the strand + * of this server instance. + * + * After stopping the server, the specified completion handler will + * be submitted for execution. + * + * This method may safely be called from any thread. + */ + template + void async_stop_server(Handler&& handler) + { + asio::post( + m_strand, + [this, handler] () mutable { + stop_server(); + asio::post(m_strand, handler); + }); + } + /** * Called when an FPGA interrupt occurs. * @@ -134,7 +190,7 @@ public: * Before returning, this function must disable and clear any pending * interrupt for the DMA stream. * - * This function may be called outside the strand of this instance. + * This function may safely be called from any thread. */ void handle_interrupt() { @@ -224,6 +280,10 @@ private: [this](auto& ec, size_t n){ handle_receive(ec, n); }); } + // Clear buffer, then enable DMA stream. + m_dma_stream.init(); + m_dma_stream.set_enabled(true); + // Try to send some data. transmit_data(false); } @@ -276,14 +336,14 @@ private: /** Send completion handler. */ void handle_send(const boost::system::error_code& error, size_t len) { + assert(m_send_in_progress); + assert(m_send_size > 0); + m_send_in_progress = false; if (m_stale_send) { // This completion refers to an old, already closed connection. m_stale_send = false; - - // Discard the remaining data that was part of this send. - send_completed(); return; } @@ -306,8 +366,6 @@ private: m_stale_receive = true; } - // Discard the remaining data that was part of this send. - send_completed(); return; } @@ -320,23 +378,14 @@ private: [this](auto& ec, size_t n){ handle_send(ec, n); }); } else { // Fully completed. - send_completed(); + // Release the completed part of the DMA buffer. + m_dma_stream.consume_data(m_send_size); + + // Try to send more data. + transmit_data(false); } } - /** Called when a send operation is fully completed. */ - void send_completed() - { - assert(! m_send_in_progress); - assert(m_send_size > 0); - - // Release the completed part of the DMA buffer. - m_dma_stream.consume_data(m_send_size); - - // Try to send more data. - transmit_data(false); - } - /** * Try to send data from the DMA buffer to the TCP connection. * diff --git a/os/src/userspace/puzzlecmd.cpp b/os/src/userspace/puzzlecmd.cpp index 474c9fd..e30a034 100644 --- a/os/src/userspace/puzzlecmd.cpp +++ b/os/src/userspace/puzzlecmd.cpp @@ -165,10 +165,9 @@ void run_data_server(puzzlefw::PuzzleFwDevice& device) device.clear_dma_errors(); device.set_dma_enabled(true); + // Enable data servers. acq_server.start_server(); timetagger_server.start_server(); - acq_stream.set_enabled(true); - timetagger_stream.set_enabled(true); log(LOG_INFO, "Running, press Ctrl-C to stop"); io.run(); diff --git a/os/src/userspace/puzzlefw.hpp b/os/src/userspace/puzzlefw.hpp index 35eeb1a..e7ce952 100644 --- a/os/src/userspace/puzzlefw.hpp +++ b/os/src/userspace/puzzlefw.hpp @@ -715,8 +715,8 @@ public: m_device.write_reg(m_reg.addr_end, m_buf_end); m_device.write_reg(m_reg.addr_limit, m_buf_end - POINTER_MARGIN); - // Initialize DMA stream (reset write pointer to start of segment). - m_device.write_reg(m_reg.dma_ctrl, 2); + // Initialize DMA stream and reset write pointer. + init(); } // Delete copy constructor and assignment operator. @@ -726,8 +726,7 @@ public: /** Disable DMA stream. Remaining data in buffer is discarded. */ ~DmaWriteStream() { - // Disable DMA stream and re-initialize. - m_device.write_reg(m_reg.dma_ctrl, 0); + // Disable and reset DMA stream. m_device.write_reg(m_reg.dma_ctrl, 2); // Disable DMA stream interrupt and clear pending interrupt. @@ -752,6 +751,29 @@ public: m_device.write_reg(m_reg.dma_ctrl, enable ? 1 : 0); } + /** Disable DMA stream and discard all data from the buffer. */ + void init() + { + // Disable DMA stream. + m_device.write_reg(m_reg.dma_ctrl, 0); + + // Wait until DMA completed or timeout. + for (int i = 0; i < 16; i++) { + if ((m_device.read_reg(m_reg.dma_status) & 1) == 0) { + break; + } + } + + // Reset write pointer, clear buffer. + m_device.write_reg(m_reg.dma_ctrl, 2); + + // Disable and clear interrupt. + disable_interrupt(); + + // Reset read pointer. + m_read_pointer = m_buf_start; + } + /** Return the number of bytes available in the DMA buffer. */ size_t get_data_available() {