diff --git a/sw/src/userspace/data_server.hpp b/sw/src/userspace/data_server.hpp index 0630bb5..3b90b1b 100644 --- a/sw/src/userspace/data_server.hpp +++ b/sw/src/userspace/data_server.hpp @@ -115,6 +115,12 @@ public: asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port); m_acceptor.bind(addr); + // Clear buffer, then enable DMA stream. + if (! m_connection.is_open()) { + discard_stale_data(); + m_dma_stream.set_enabled(true); + } + // Start listening for connections. m_acceptor.listen(); @@ -133,9 +139,6 @@ public: */ void stop_server() { - // Disable DMA stream and clear buffer. - m_dma_stream.init(); - // Stop accepting connections. if (m_acceptor.is_open()) { log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port); @@ -149,6 +152,34 @@ public: m_stale_receive = true; m_stale_send = m_send_in_progress; } + + // Disable DMA stream and clear buffer. + m_dma_stream.init(); + } + + /** + * Disconnect the current client and discard all pending data. + * + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". + */ + void clear_data() + { + // Close the current connection. + if (m_connection.is_open()) { + log(LOG_INFO, "Closing connection to port %d", m_tcp_port); + m_connection.close(); + m_stale_receive = true; + m_stale_send = m_send_in_progress; + } + + // Disable DMA stream and clear buffer. + discard_stale_data(); + + // Re-enable DMA stream. + if (m_acceptor.is_open()) { + m_dma_stream.set_enabled(true); + } } /** @@ -272,10 +303,6 @@ private: [this](auto ec, size_t n){ handle_receive(ec, n); }); } - // Clear buffer, then enable DMA stream. - discard_stale_data(); - m_dma_stream.set_enabled(true); - // Prepare to send data. transmit_data(false); } diff --git a/sw/src/userspace/remotectl.cpp b/sw/src/userspace/remotectl.cpp index 450d6d2..091d919 100644 --- a/sw/src/userspace/remotectl.cpp +++ b/sw/src/userspace/remotectl.cpp @@ -481,6 +481,8 @@ public: , m_model_name(model_name) , m_serial_number(serial_number) , m_control_server(nullptr) + , m_acquisition_server(nullptr) + , m_timetagger_server(nullptr) , m_shutting_down(false) , m_exit_status(EXIT_ERROR) { } @@ -495,10 +497,13 @@ public: m_control_server = &control_server; } - /** Register a data server with the command handler. */ - void add_data_server(DataServer& data_server) + /** Register the data servers with the command handler. */ + void set_data_servers(DataServer& acquisition_server, + DataServer& timetagger_server) { - m_data_servers.push_back(&data_server); + m_acquisition_server = &acquisition_server; + m_timetagger_server = &timetagger_server; + m_data_servers = {m_acquisition_server, m_timetagger_server}; } /** Return the exit status of the command handler. */ @@ -551,17 +556,20 @@ public: * If a multi-threaded Asio event loop is running, this method may only * be called through the strand returned by "get_executor()". * - * Returns: - * Response without line terminator, - * or an empty string if no response must be sent. + * The specified response function will be called exactly once to pass + * the command response (without line terminator). An empty string is + * passed if no response must be sent. + * + * The call to the response function may be deferred. */ - std::string handle_command(const std::string& command) + void handle_command(const std::string& command, + std::function respond) { if (m_shutting_down) { // The server is shutting down. - // Ignore new commands and return an empty string to indicate - // that no response must be sent. - return std::string(); + // Ignore new commands and send no response. + respond(std::string()); + return; } // Split words. @@ -569,7 +577,8 @@ public: // Ignore empty command line without response. if (tokens.empty()) { - return std::string(); + respond(std::string()); + return; } // Convert command to lower case. @@ -584,7 +593,8 @@ public: && action[7] == ':') { char channel_digit = action[6]; if (channel_digit < '1' || channel_digit > '4') { - return err_unknown_command(); + respond(err_unknown_command()); + return; } env.channel = channel_digit - '1'; action[6] = 'N'; // mark channel index @@ -622,10 +632,13 @@ public: const auto it = command_table_no_args.find(action); if (it != command_table_no_args.end()) { if (tokens.size() != 1) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } auto func = it->second; - return (this->*func)(env); + std::string resp = (this->*func)(env); + respond(resp); + return; } } @@ -634,37 +647,57 @@ public: const auto it = command_table_one_arg.find(action); if (it != command_table_one_arg.end()) { if (tokens.size() < 2) { - return err_missing_argument(); + respond(err_missing_argument()); + return; } if (tokens.size() > 2) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } auto func = it->second; - return (this->*func)(env, tokens[1]); + std::string resp = (this->*func)(env, tokens[1]); + respond(resp); + return; } } // Handle command IPCFG. if (action == "ipcfg" || action == "ipcfg:saved") { if (tokens.size() < 2) { - return err_missing_argument(); + respond(err_missing_argument()); + return; } if (tokens.size() > 5) { - return err_unexpected_argument(); + respond(err_unexpected_argument()); + return; } tokens.erase(tokens.begin()); - return cmd_ipcfg(env, tokens); + std::string resp = cmd_ipcfg(env, tokens); + respond(resp); + return; } - return err_unknown_command(); + // Handle command AIN:CLEAR. + if (action == "ain:clear") { + cmd_ain_clear(respond); + return; + } + + // Handle command TT:CLEAR. + if (action == "tt:clear") { + cmd_tt_clear(respond); + return; + } + + respond(err_unknown_command()); } private: - /** Asynchronously stop control and/or data servers. */ + /** Asynchronously stop control and data servers. */ void stop_server(std::function handler); void stop_data_servers(unsigned int idx, std::function handler); - /** Asynchronously start control and/or data servers. */ + /** Asynchronously start control and data servers. */ void start_server(); void start_data_servers(unsigned int idx); @@ -1365,6 +1398,26 @@ private: } } + /** Handle command AIN:CLEAR */ + void cmd_ain_clear(std::function respond) + { + asio::post(m_acquisition_server->get_executor(), + [this,respond]() { + m_acquisition_server->clear_data(); + respond("OK"); + }); + } + + /** Handle command TT:CLEAR */ + void cmd_tt_clear(std::function respond) + { + asio::post(m_timetagger_server->get_executor(), + [this,respond]() { + m_timetagger_server->clear_data(); + respond("OK"); + }); + } + static const inline std::map< std::string, std::string (CommandHandler::*)(CommandEnvironment)> @@ -1436,6 +1489,8 @@ private: std::string m_model_name; std::string m_serial_number; ControlServer* m_control_server; + DataServer* m_acquisition_server; + DataServer* m_timetagger_server; std::vector m_data_servers; Calibration m_calibration; bool m_shutting_down; @@ -1715,17 +1770,22 @@ private: m_command_handler.get_executor(), [this,conn,command]() { // This code runs in the command handler strand. - // Tell the command handler to run the command. - auto response = m_command_handler.handle_command(command); - // Post the response back to our own strand. - asio::post( - m_strand, - [this,conn,response]() { - // This code runs in our own strand. - // Handle the response. - process_response(conn, response); - }); + // Define helper function which will be called by + // the command handler to pass a response. + auto respond = [this,conn](std::string response) { + // Post the response back to our own strand. + asio::post( + m_strand, + [this,conn,response]() { + // This code runs in our own strand. + // Handle the response. + process_response(conn, response); + }); + }; + + // Tell the command handler to run the command. + m_command_handler.handle_command(command, respond); }); } @@ -1878,8 +1938,7 @@ int run_remote_control_server( ControlServer control_server(io, command_handler, 5025); command_handler.set_control_server(control_server); - command_handler.add_data_server(acq_server); - command_handler.add_data_server(timetagger_server); + command_handler.set_data_servers(acq_server, timetagger_server); // Disable DMA on exit from this function. struct ScopeGuard {