From b32200ba2f375690c5811800247a0e4163d41e6d Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Mon, 20 Oct 2025 19:50:31 +0200 Subject: [PATCH] Avoid race condition on new data connection The previous code would clear DMA buffers when a new TCP data connection is established. This is bad, because the completion of the server side accept() call happens at some undefined moment after completion of the client side connect() call. The client may in the mean time enable data acquisition via the control connection. This will lead to data loss if the server subsequently clears DMA buffers while acquisition is already in progress. This commit makes two changes: - The server does not clear its DMA buffers when it accepts a new connection. - New commands "AIN:CLEAR" and "TT:CLEAR" are added which explicitly clear DMA buffers and drop the current data connection, if any. --- sw/src/userspace/data_server.hpp | 41 ++++++++-- sw/src/userspace/remotectl.cpp | 129 ++++++++++++++++++++++--------- 2 files changed, 128 insertions(+), 42 deletions(-) diff --git a/sw/src/userspace/data_server.hpp b/sw/src/userspace/data_server.hpp index 0630bb5..3b90b1b 100644 --- a/sw/src/userspace/data_server.hpp +++ b/sw/src/userspace/data_server.hpp @@ -115,6 +115,12 @@ public: asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port); m_acceptor.bind(addr); + // Clear buffer, then enable DMA stream. + if (! m_connection.is_open()) { + discard_stale_data(); + m_dma_stream.set_enabled(true); + } + // Start listening for connections. m_acceptor.listen(); @@ -133,9 +139,6 @@ public: */ void stop_server() { - // Disable DMA stream and clear buffer. - m_dma_stream.init(); - // Stop accepting connections. if (m_acceptor.is_open()) { log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port); @@ -149,6 +152,34 @@ public: m_stale_receive = true; m_stale_send = m_send_in_progress; } + + // Disable DMA stream and clear buffer. + m_dma_stream.init(); + } + + /** + * Disconnect the current client and discard all pending data. + * + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". + */ + void clear_data() + { + // Close the current connection. + if (m_connection.is_open()) { + log(LOG_INFO, "Closing connection to port %d", m_tcp_port); + m_connection.close(); + m_stale_receive = true; + m_stale_send = m_send_in_progress; + } + + // Disable DMA stream and clear buffer. + discard_stale_data(); + + // Re-enable DMA stream. + if (m_acceptor.is_open()) { + m_dma_stream.set_enabled(true); + } } /** @@ -272,10 +303,6 @@ private: [this](auto ec, size_t n){ handle_receive(ec, n); }); } - // Clear buffer, then enable DMA stream. - discard_stale_data(); - m_dma_stream.set_enabled(true); - // Prepare to send data. transmit_data(false); } diff --git a/sw/src/userspace/remotectl.cpp b/sw/src/userspace/remotectl.cpp index 450d6d2..091d919 100644 --- a/sw/src/userspace/remotectl.cpp +++ b/sw/src/userspace/remotectl.cpp @@ -481,6 +481,8 @@ public: , m_model_name(model_name) , m_serial_number(serial_number) , m_control_server(nullptr) + , m_acquisition_server(nullptr) + , m_timetagger_server(nullptr) , m_shutting_down(false) , m_exit_status(EXIT_ERROR) { } @@ -495,10 +497,13 @@ public: m_control_server = &control_server; } - /** Register a data server with the command handler. */ - void add_data_server(DataServer& data_server) + /** Register the data servers with the command handler. */ + void set_data_servers(DataServer& acquisition_server, + DataServer& timetagger_server) { - m_data_servers.push_back(&data_server); + m_acquisition_server = &acquisition_server; + m_timetagger_server = &timetagger_server; + m_data_servers = {m_acquisition_server, m_timetagger_server}; } /** Return the exit status of the command handler. */ @@ -551,17 +556,20 @@ public: * If a multi-threaded Asio event loop is running, this method may only * be called through the strand returned by "get_executor()". * - * Returns: - * Response without line terminator, - * or an empty string if no response must be sent. + * The specified response function will be called exactly once to pass + * the command response (without line terminator). An empty string is + * passed if no response must be sent. + * + * The call to the response function may be deferred. */ - std::string handle_command(const std::string& command) + void handle_command(const std::string& command, + std::function respond) { if (m_shutting_down) { // The server is shutting down. - // Ignore new commands and return an empty string to indicate - // that no response must be sent. - return std::string(); + // Ignore new commands and send no response. + respond(std::string()); + return; } // Split words. @@ -569,7 +577,8 @@ public: // Ignore empty command line without response. if (tokens.empty()) { - return std::string(); + respond(std::string()); + return; } // Convert command to lower case. @@ -584,7 +593,8 @@ public: && action[7] == ':') { char channel_digit = action[6]; if (channel_digit < '1' || channel_digit > '4') { - return err_unknown_command(); + respond(err_unknown_command()); + return; } env.channel = channel_digit - '1'; action[6] = 'N'; // mark channel index @@ -622,10 +632,13 @@ public: const auto it = command_table_no_args.find(action); if (it != command_table_no_args.end()) { if (tokens.size() != 1) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } auto func = it->second; - return (this->*func)(env); + std::string resp = (this->*func)(env); + respond(resp); + return; } } @@ -634,37 +647,57 @@ public: const auto it = command_table_one_arg.find(action); if (it != command_table_one_arg.end()) { if (tokens.size() < 2) { - return err_missing_argument(); + respond(err_missing_argument()); + return; } if (tokens.size() > 2) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } auto func = it->second; - return (this->*func)(env, tokens[1]); + std::string resp = (this->*func)(env, tokens[1]); + respond(resp); + return; } } // Handle command IPCFG. if (action == "ipcfg" || action == "ipcfg:saved") { if (tokens.size() < 2) { - return err_missing_argument(); + respond(err_missing_argument()); + return; } if (tokens.size() > 5) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } tokens.erase(tokens.begin()); - return cmd_ipcfg(env, tokens); + std::string resp = cmd_ipcfg(env, tokens); + respond(resp); + return; } - return err_unknown_command(); + // Handle command AIN:CLEAR. + if (action == "ain:clear") { + cmd_ain_clear(respond); + return; + } + + // Handle command TT:CLEAR. + if (action == "tt:clear") { + cmd_tt_clear(respond); + return; + } + + respond(err_unknown_command()); } private: - /** Asynchronously stop control and/or data servers. */ + /** Asynchronously stop control and data servers. */ void stop_server(std::function handler); void stop_data_servers(unsigned int idx, std::function handler); - /** Asynchronously start control and/or data servers. */ + /** Asynchronously start control and data servers. */ void start_server(); void start_data_servers(unsigned int idx); @@ -1365,6 +1398,26 @@ private: } } + /** Handle command AIN:CLEAR */ + void cmd_ain_clear(std::function respond) + { + asio::post(m_acquisition_server->get_executor(), + [this,respond]() { + m_acquisition_server->clear_data(); + respond("OK"); + }); + } + + /** Handle command TT:CLEAR */ + void cmd_tt_clear(std::function respond) + { + asio::post(m_timetagger_server->get_executor(), + [this,respond]() { + m_timetagger_server->clear_data(); + respond("OK"); + }); + } + static const inline std::map< std::string, std::string (CommandHandler::*)(CommandEnvironment)> @@ -1436,6 +1489,8 @@ private: std::string m_model_name; std::string m_serial_number; ControlServer* m_control_server; + DataServer* m_acquisition_server; + DataServer* m_timetagger_server; std::vector m_data_servers; Calibration m_calibration; bool m_shutting_down; @@ -1715,17 +1770,22 @@ private: m_command_handler.get_executor(), [this,conn,command]() { // This code runs in the command handler strand. - // Tell the command handler to run the command. - auto response = m_command_handler.handle_command(command); - // Post the response back to our own strand. - asio::post( - m_strand, - [this,conn,response]() { - // This code runs in our own strand. - // Handle the response. - process_response(conn, response); - }); + // Define helper function which will be called by + // the command handler to pass a response. + auto respond = [this,conn](std::string response) { + // Post the response back to our own strand. + asio::post( + m_strand, + [this,conn,response]() { + // This code runs in our own strand. + // Handle the response. + process_response(conn, response); + }); + }; + + // Tell the command handler to run the command. + m_command_handler.handle_command(command, respond); }); } @@ -1878,8 +1938,7 @@ int run_remote_control_server( ControlServer control_server(io, command_handler, 5025); command_handler.set_control_server(control_server); - command_handler.add_data_server(acq_server); - command_handler.add_data_server(timetagger_server); + command_handler.set_data_servers(acq_server, timetagger_server); // Disable DMA on exit from this function. struct ScopeGuard {