Avoid race condition on new data connection

The previous code would clear DMA buffers when a new TCP
data connection is established. This is bad, because the
completion of the server side accept() call happens at
some undefined moment after completion of the client side
connect() call. The client may in the mean time enable
data acquisition via the control connection. This will
lead to data loss if the server subsequently clears
DMA buffers while acquisition is already in progress.

This commit makes two changes:
 - The server does not clear its DMA buffers when it accepts
   a new connection.
 - New commands "AIN:CLEAR" and "TT:CLEAR" are added which
   explicitly clear DMA buffers and drop the current data
   connection, if any.
This commit is contained in:
Joris van Rantwijk 2025-10-20 19:50:31 +02:00
parent 7cf66e031e
commit b32200ba2f
2 changed files with 128 additions and 42 deletions

View File

@ -115,6 +115,12 @@ public:
asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port);
m_acceptor.bind(addr);
// Clear buffer, then enable DMA stream.
if (! m_connection.is_open()) {
discard_stale_data();
m_dma_stream.set_enabled(true);
}
// Start listening for connections.
m_acceptor.listen();
@ -133,9 +139,6 @@ public:
*/
void stop_server()
{
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Stop accepting connections.
if (m_acceptor.is_open()) {
log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port);
@ -149,6 +152,34 @@ public:
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
// Disable DMA stream and clear buffer.
m_dma_stream.init();
}
/**
* Disconnect the current client and discard all pending data.
*
* If a multi-threaded Asio event loop is running, this method may only
* be called through the strand returned by "get_executor()".
*/
void clear_data()
{
// Close the current connection.
if (m_connection.is_open()) {
log(LOG_INFO, "Closing connection to port %d", m_tcp_port);
m_connection.close();
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
// Disable DMA stream and clear buffer.
discard_stale_data();
// Re-enable DMA stream.
if (m_acceptor.is_open()) {
m_dma_stream.set_enabled(true);
}
}
/**
@ -272,10 +303,6 @@ private:
[this](auto ec, size_t n){ handle_receive(ec, n); });
}
// Clear buffer, then enable DMA stream.
discard_stale_data();
m_dma_stream.set_enabled(true);
// Prepare to send data.
transmit_data(false);
}

View File

@ -481,6 +481,8 @@ public:
, m_model_name(model_name)
, m_serial_number(serial_number)
, m_control_server(nullptr)
, m_acquisition_server(nullptr)
, m_timetagger_server(nullptr)
, m_shutting_down(false)
, m_exit_status(EXIT_ERROR)
{ }
@ -495,10 +497,13 @@ public:
m_control_server = &control_server;
}
/** Register a data server with the command handler. */
void add_data_server(DataServer& data_server)
/** Register the data servers with the command handler. */
void set_data_servers(DataServer& acquisition_server,
DataServer& timetagger_server)
{
m_data_servers.push_back(&data_server);
m_acquisition_server = &acquisition_server;
m_timetagger_server = &timetagger_server;
m_data_servers = {m_acquisition_server, m_timetagger_server};
}
/** Return the exit status of the command handler. */
@ -551,17 +556,20 @@ public:
* If a multi-threaded Asio event loop is running, this method may only
* be called through the strand returned by "get_executor()".
*
* Returns:
* Response without line terminator,
* or an empty string if no response must be sent.
* The specified response function will be called exactly once to pass
* the command response (without line terminator). An empty string is
* passed if no response must be sent.
*
* The call to the response function may be deferred.
*/
std::string handle_command(const std::string& command)
void handle_command(const std::string& command,
std::function<void(std::string)> respond)
{
if (m_shutting_down) {
// The server is shutting down.
// Ignore new commands and return an empty string to indicate
// that no response must be sent.
return std::string();
// Ignore new commands and send no response.
respond(std::string());
return;
}
// Split words.
@ -569,7 +577,8 @@ public:
// Ignore empty command line without response.
if (tokens.empty()) {
return std::string();
respond(std::string());
return;
}
// Convert command to lower case.
@ -584,7 +593,8 @@ public:
&& action[7] == ':') {
char channel_digit = action[6];
if (channel_digit < '1' || channel_digit > '4') {
return err_unknown_command();
respond(err_unknown_command());
return;
}
env.channel = channel_digit - '1';
action[6] = 'N'; // mark channel index
@ -622,10 +632,13 @@ public:
const auto it = command_table_no_args.find(action);
if (it != command_table_no_args.end()) {
if (tokens.size() != 1) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
auto func = it->second;
return (this->*func)(env);
std::string resp = (this->*func)(env);
respond(resp);
return;
}
}
@ -634,37 +647,57 @@ public:
const auto it = command_table_one_arg.find(action);
if (it != command_table_one_arg.end()) {
if (tokens.size() < 2) {
return err_missing_argument();
respond(err_missing_argument());
return;
}
if (tokens.size() > 2) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
auto func = it->second;
return (this->*func)(env, tokens[1]);
std::string resp = (this->*func)(env, tokens[1]);
respond(resp);
return;
}
}
// Handle command IPCFG.
if (action == "ipcfg" || action == "ipcfg:saved") {
if (tokens.size() < 2) {
return err_missing_argument();
respond(err_missing_argument());
return;
}
if (tokens.size() > 5) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
tokens.erase(tokens.begin());
return cmd_ipcfg(env, tokens);
std::string resp = cmd_ipcfg(env, tokens);
respond(resp);
return;
}
return err_unknown_command();
// Handle command AIN:CLEAR.
if (action == "ain:clear") {
cmd_ain_clear(respond);
return;
}
// Handle command TT:CLEAR.
if (action == "tt:clear") {
cmd_tt_clear(respond);
return;
}
respond(err_unknown_command());
}
private:
/** Asynchronously stop control and/or data servers. */
/** Asynchronously stop control and data servers. */
void stop_server(std::function<void()> handler);
void stop_data_servers(unsigned int idx, std::function<void()> handler);
/** Asynchronously start control and/or data servers. */
/** Asynchronously start control and data servers. */
void start_server();
void start_data_servers(unsigned int idx);
@ -1365,6 +1398,26 @@ private:
}
}
/** Handle command AIN:CLEAR */
void cmd_ain_clear(std::function<void(std::string)> respond)
{
asio::post(m_acquisition_server->get_executor(),
[this,respond]() {
m_acquisition_server->clear_data();
respond("OK");
});
}
/** Handle command TT:CLEAR */
void cmd_tt_clear(std::function<void(std::string)> respond)
{
asio::post(m_timetagger_server->get_executor(),
[this,respond]() {
m_timetagger_server->clear_data();
respond("OK");
});
}
static const inline std::map<
std::string,
std::string (CommandHandler::*)(CommandEnvironment)>
@ -1436,6 +1489,8 @@ private:
std::string m_model_name;
std::string m_serial_number;
ControlServer* m_control_server;
DataServer* m_acquisition_server;
DataServer* m_timetagger_server;
std::vector<DataServer*> m_data_servers;
Calibration m_calibration;
bool m_shutting_down;
@ -1715,9 +1770,10 @@ private:
m_command_handler.get_executor(),
[this,conn,command]() {
// This code runs in the command handler strand.
// Tell the command handler to run the command.
auto response = m_command_handler.handle_command(command);
// Define helper function which will be called by
// the command handler to pass a response.
auto respond = [this,conn](std::string response) {
// Post the response back to our own strand.
asio::post(
m_strand,
@ -1726,6 +1782,10 @@ private:
// Handle the response.
process_response(conn, response);
});
};
// Tell the command handler to run the command.
m_command_handler.handle_command(command, respond);
});
}
@ -1878,8 +1938,7 @@ int run_remote_control_server(
ControlServer control_server(io, command_handler, 5025);
command_handler.set_control_server(control_server);
command_handler.add_data_server(acq_server);
command_handler.add_data_server(timetagger_server);
command_handler.set_data_servers(acq_server, timetagger_server);
// Disable DMA on exit from this function.
struct ScopeGuard {