Compare commits

...

4 Commits

Author SHA1 Message Date
Joris van Rantwijk 2ab072b254 Document commands AIN:CLEAR and TT:CLEAR 2025-10-21 08:44:58 +02:00
Joris van Rantwijk 98058d6291 Bump software version to 1.1 2025-10-21 08:33:17 +02:00
Joris van Rantwijk b32200ba2f Avoid race condition on new data connection
The previous code would clear DMA buffers when a new TCP
data connection is established. This is bad, because the
completion of the server side accept() call happens at
some undefined moment after completion of the client side
connect() call. The client may in the mean time enable
data acquisition via the control connection. This will
lead to data loss if the server subsequently clears
DMA buffers while acquisition is already in progress.

This commit makes two changes:
 - The server does not clear its DMA buffers when it accepts
   a new connection.
 - New commands "AIN:CLEAR" and "TT:CLEAR" are added which
   explicitly clear DMA buffers and drop the current data
   connection, if any.
2025-10-21 08:32:46 +02:00
Joris van Rantwijk 7cf66e031e Reset limit pointer during DMA re-init 2025-10-20 12:19:32 +02:00
5 changed files with 156 additions and 44 deletions

View File

@ -351,9 +351,11 @@ In the response string, such data elements are separated by space characters.
| `AIN:TRIGGER:EXT:CHANNEL` | External trigger channel. |
| `AIN:TRIGGER:EXT:EDGE` | External trigger edge. |
| `AIN:ACQUIRE:ENABLE` | Enable analog acquisition. |
| `AIN:CLEAR` | Clear buffer and drop data connection. |
| `TT:SAMPLE?` | Digital input state. |
| `TT:EVENT:MASK` | Timetagger event mask. |
| `TT:MARK` | Emit timetagger marker. |
| `TT:CLEAR` | Clear buffer and drop data connection. |
| `TEMP:FPGA?` | FPGA temperature. |
| `IPCFG[:SAVED]` | IP address configuration. |
| `HALT` | Shut down system. |
@ -626,6 +628,17 @@ When disabled, all triggers are ignored and any ongoing analog acquisition stops
Query: `AIN:ACQUIRE:ENABLE?` <br>
Response: either `0` or `1`.
### `AIN:CLEAR`
Command: `AIN:CLEAR`
This command clears internal buffers holding analog acquisition data.
It also drops the current data connection to port 5001, if any.
Clients should send this command just before connecting to the analog
sample data port and enabling analog data acquisition.
This prevents the transmission of stale data from a previous run.
### `TT:SAMPLE?`
Query: `TT:SAMPLE?` <br>
@ -662,6 +675,17 @@ Command: `TT:MARK`
This command emits a marker record in the timetagger event stream.
### `TT:CLEAR`
Command: `TT:CLEAR`
This command clears internal buffers holding timetagger data.
It also drops the current data connection to port 5002, if any.
Clients should send this command just before connecting to the timetagger
data port and enabling timetagger events.
This prevents the transmission of stale data from a previous run.
### `TEMP:FPGA?`
Query: `TEMP:FPGA?` <br>

View File

@ -115,6 +115,12 @@ public:
asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port);
m_acceptor.bind(addr);
// Clear buffer, then enable DMA stream.
if (! m_connection.is_open()) {
discard_stale_data();
m_dma_stream.set_enabled(true);
}
// Start listening for connections.
m_acceptor.listen();
@ -133,9 +139,6 @@ public:
*/
void stop_server()
{
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Stop accepting connections.
if (m_acceptor.is_open()) {
log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port);
@ -149,6 +152,34 @@ public:
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
// Disable DMA stream and clear buffer.
m_dma_stream.init();
}
/**
* Disconnect the current client and discard all pending data.
*
* If a multi-threaded Asio event loop is running, this method may only
* be called through the strand returned by "get_executor()".
*/
void clear_data()
{
// Close the current connection.
if (m_connection.is_open()) {
log(LOG_INFO, "Closing connection to port %d", m_tcp_port);
m_connection.close();
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
// Disable DMA stream and clear buffer.
discard_stale_data();
// Re-enable DMA stream.
if (m_acceptor.is_open()) {
m_dma_stream.set_enabled(true);
}
}
/**
@ -272,10 +303,6 @@ private:
[this](auto ec, size_t n){ handle_receive(ec, n); });
}
// Clear buffer, then enable DMA stream.
discard_stale_data();
m_dma_stream.set_enabled(true);
// Prepare to send data.
transmit_data(false);
}

View File

@ -732,7 +732,6 @@ public:
// Set up DMA buffer segment.
m_device.write_reg(m_reg.addr_start, m_buf_start);
m_device.write_reg(m_reg.addr_end, m_buf_end);
m_device.write_reg(m_reg.addr_limit, m_buf_end - POINTER_MARGIN);
// Initialize DMA stream and reset write pointer.
init();
@ -791,6 +790,9 @@ public:
// Reset read pointer.
m_read_pointer = m_buf_start;
// Reset write limit pointer.
m_device.write_reg(m_reg.addr_limit, m_buf_end - POINTER_MARGIN);
}
/** Return the number of bytes available in the DMA buffer. */

View File

@ -481,6 +481,8 @@ public:
, m_model_name(model_name)
, m_serial_number(serial_number)
, m_control_server(nullptr)
, m_acquisition_server(nullptr)
, m_timetagger_server(nullptr)
, m_shutting_down(false)
, m_exit_status(EXIT_ERROR)
{ }
@ -495,10 +497,13 @@ public:
m_control_server = &control_server;
}
/** Register a data server with the command handler. */
void add_data_server(DataServer& data_server)
/** Register the data servers with the command handler. */
void set_data_servers(DataServer& acquisition_server,
DataServer& timetagger_server)
{
m_data_servers.push_back(&data_server);
m_acquisition_server = &acquisition_server;
m_timetagger_server = &timetagger_server;
m_data_servers = {m_acquisition_server, m_timetagger_server};
}
/** Return the exit status of the command handler. */
@ -551,17 +556,20 @@ public:
* If a multi-threaded Asio event loop is running, this method may only
* be called through the strand returned by "get_executor()".
*
* Returns:
* Response without line terminator,
* or an empty string if no response must be sent.
* The specified response function will be called exactly once to pass
* the command response (without line terminator). An empty string is
* passed if no response must be sent.
*
* The call to the response function may be deferred.
*/
std::string handle_command(const std::string& command)
void handle_command(const std::string& command,
std::function<void(std::string)> respond)
{
if (m_shutting_down) {
// The server is shutting down.
// Ignore new commands and return an empty string to indicate
// that no response must be sent.
return std::string();
// Ignore new commands and send no response.
respond(std::string());
return;
}
// Split words.
@ -569,7 +577,8 @@ public:
// Ignore empty command line without response.
if (tokens.empty()) {
return std::string();
respond(std::string());
return;
}
// Convert command to lower case.
@ -584,7 +593,8 @@ public:
&& action[7] == ':') {
char channel_digit = action[6];
if (channel_digit < '1' || channel_digit > '4') {
return err_unknown_command();
respond(err_unknown_command());
return;
}
env.channel = channel_digit - '1';
action[6] = 'N'; // mark channel index
@ -622,10 +632,13 @@ public:
const auto it = command_table_no_args.find(action);
if (it != command_table_no_args.end()) {
if (tokens.size() != 1) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
auto func = it->second;
return (this->*func)(env);
std::string resp = (this->*func)(env);
respond(resp);
return;
}
}
@ -634,37 +647,57 @@ public:
const auto it = command_table_one_arg.find(action);
if (it != command_table_one_arg.end()) {
if (tokens.size() < 2) {
return err_missing_argument();
respond(err_missing_argument());
return;
}
if (tokens.size() > 2) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
auto func = it->second;
return (this->*func)(env, tokens[1]);
std::string resp = (this->*func)(env, tokens[1]);
respond(resp);
return;
}
}
// Handle command IPCFG.
if (action == "ipcfg" || action == "ipcfg:saved") {
if (tokens.size() < 2) {
return err_missing_argument();
respond(err_missing_argument());
return;
}
if (tokens.size() > 5) {
return err_unexpected_argument();
respond(err_unexpected_argument());
return;
}
tokens.erase(tokens.begin());
return cmd_ipcfg(env, tokens);
std::string resp = cmd_ipcfg(env, tokens);
respond(resp);
return;
}
return err_unknown_command();
// Handle command AIN:CLEAR.
if (action == "ain:clear") {
cmd_ain_clear(respond);
return;
}
// Handle command TT:CLEAR.
if (action == "tt:clear") {
cmd_tt_clear(respond);
return;
}
respond(err_unknown_command());
}
private:
/** Asynchronously stop control and/or data servers. */
/** Asynchronously stop control and data servers. */
void stop_server(std::function<void()> handler);
void stop_data_servers(unsigned int idx, std::function<void()> handler);
/** Asynchronously start control and/or data servers. */
/** Asynchronously start control and data servers. */
void start_server();
void start_data_servers(unsigned int idx);
@ -1365,6 +1398,26 @@ private:
}
}
/** Handle command AIN:CLEAR */
void cmd_ain_clear(std::function<void(std::string)> respond)
{
asio::post(m_acquisition_server->get_executor(),
[this,respond]() {
m_acquisition_server->clear_data();
respond("OK");
});
}
/** Handle command TT:CLEAR */
void cmd_tt_clear(std::function<void(std::string)> respond)
{
asio::post(m_timetagger_server->get_executor(),
[this,respond]() {
m_timetagger_server->clear_data();
respond("OK");
});
}
static const inline std::map<
std::string,
std::string (CommandHandler::*)(CommandEnvironment)>
@ -1436,6 +1489,8 @@ private:
std::string m_model_name;
std::string m_serial_number;
ControlServer* m_control_server;
DataServer* m_acquisition_server;
DataServer* m_timetagger_server;
std::vector<DataServer*> m_data_servers;
Calibration m_calibration;
bool m_shutting_down;
@ -1715,17 +1770,22 @@ private:
m_command_handler.get_executor(),
[this,conn,command]() {
// This code runs in the command handler strand.
// Tell the command handler to run the command.
auto response = m_command_handler.handle_command(command);
// Post the response back to our own strand.
asio::post(
m_strand,
[this,conn,response]() {
// This code runs in our own strand.
// Handle the response.
process_response(conn, response);
});
// Define helper function which will be called by
// the command handler to pass a response.
auto respond = [this,conn](std::string response) {
// Post the response back to our own strand.
asio::post(
m_strand,
[this,conn,response]() {
// This code runs in our own strand.
// Handle the response.
process_response(conn, response);
});
};
// Tell the command handler to run the command.
m_command_handler.handle_command(command, respond);
});
}
@ -1878,8 +1938,7 @@ int run_remote_control_server(
ControlServer control_server(io, command_handler, 5025);
command_handler.set_control_server(control_server);
command_handler.add_data_server(acq_server);
command_handler.add_data_server(timetagger_server);
command_handler.set_data_servers(acq_server, timetagger_server);
// Disable DMA on exit from this function.
struct ScopeGuard {

View File

@ -1,2 +1,2 @@
#define PUZZLEFW_SW_MAJOR 1
#define PUZZLEFW_SW_MINOR 0
#define PUZZLEFW_SW_MINOR 1