Discard data on new TCP connection

This commit is contained in:
Joris van Rantwijk 2024-09-20 23:42:14 +02:00
parent ecefa2dd5a
commit a7802d11e3
3 changed files with 98 additions and 28 deletions

View File

@ -74,11 +74,19 @@ public:
/** Destructor. */ /** Destructor. */
~DataServer() ~DataServer()
{ {
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Disable interrupts. // Disable interrupts.
m_dma_stream.disable_interrupt(); m_dma_stream.disable_interrupt();
} }
/** Start the server. */ /**
* Start the server.
*
* This method must not be called directly while a multi-threaded Asio
* event loop is running. In that case, use "async_start_server()".
*/
void start_server() void start_server()
{ {
// If the server is already open, close and re-open it. // If the server is already open, close and re-open it.
@ -109,9 +117,17 @@ public:
log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port); log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port);
} }
/** Stop the server and close current connections. */ /**
* Stop the server and close current connections.
*
* This method must not be called directly while a multi-threaded Asio
* event loop is running. In that case, use "async_stop_server()".
*/
void stop_server() void stop_server()
{ {
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Stop accepting connections. // Stop accepting connections.
if (m_acceptor.is_open()) { if (m_acceptor.is_open()) {
log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port); log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port);
@ -127,6 +143,46 @@ public:
} }
} }
/**
* Submit a call to "start_server()" to be executed in the strand
* of this server instance.
*
* After starting the server, the specified completion handler will
* be submitted for execution.
*
* This method may safely be called from any thread.
*/
template <typename Handler>
void async_start_server(Handler&& handler)
{
asio::post(
m_strand,
[this, handler] () mutable {
start_server();
asio::post(m_strand, handler);
});
}
/**
* Submit a call to "stop_server()" to be executed in the strand
* of this server instance.
*
* After stopping the server, the specified completion handler will
* be submitted for execution.
*
* This method may safely be called from any thread.
*/
template <typename Handler>
void async_stop_server(Handler&& handler)
{
asio::post(
m_strand,
[this, handler] () mutable {
stop_server();
asio::post(m_strand, handler);
});
}
/** /**
* Called when an FPGA interrupt occurs. * Called when an FPGA interrupt occurs.
* *
@ -134,7 +190,7 @@ public:
* Before returning, this function must disable and clear any pending * Before returning, this function must disable and clear any pending
* interrupt for the DMA stream. * interrupt for the DMA stream.
* *
* This function may be called outside the strand of this instance. * This function may safely be called from any thread.
*/ */
void handle_interrupt() void handle_interrupt()
{ {
@ -224,6 +280,10 @@ private:
[this](auto& ec, size_t n){ handle_receive(ec, n); }); [this](auto& ec, size_t n){ handle_receive(ec, n); });
} }
// Clear buffer, then enable DMA stream.
m_dma_stream.init();
m_dma_stream.set_enabled(true);
// Try to send some data. // Try to send some data.
transmit_data(false); transmit_data(false);
} }
@ -276,14 +336,14 @@ private:
/** Send completion handler. */ /** Send completion handler. */
void handle_send(const boost::system::error_code& error, size_t len) void handle_send(const boost::system::error_code& error, size_t len)
{ {
assert(m_send_in_progress);
assert(m_send_size > 0);
m_send_in_progress = false; m_send_in_progress = false;
if (m_stale_send) { if (m_stale_send) {
// This completion refers to an old, already closed connection. // This completion refers to an old, already closed connection.
m_stale_send = false; m_stale_send = false;
// Discard the remaining data that was part of this send.
send_completed();
return; return;
} }
@ -306,8 +366,6 @@ private:
m_stale_receive = true; m_stale_receive = true;
} }
// Discard the remaining data that was part of this send.
send_completed();
return; return;
} }
@ -320,22 +378,13 @@ private:
[this](auto& ec, size_t n){ handle_send(ec, n); }); [this](auto& ec, size_t n){ handle_send(ec, n); });
} else { } else {
// Fully completed. // Fully completed.
send_completed();
}
}
/** Called when a send operation is fully completed. */
void send_completed()
{
assert(! m_send_in_progress);
assert(m_send_size > 0);
// Release the completed part of the DMA buffer. // Release the completed part of the DMA buffer.
m_dma_stream.consume_data(m_send_size); m_dma_stream.consume_data(m_send_size);
// Try to send more data. // Try to send more data.
transmit_data(false); transmit_data(false);
} }
}
/** /**
* Try to send data from the DMA buffer to the TCP connection. * Try to send data from the DMA buffer to the TCP connection.

View File

@ -165,10 +165,9 @@ void run_data_server(puzzlefw::PuzzleFwDevice& device)
device.clear_dma_errors(); device.clear_dma_errors();
device.set_dma_enabled(true); device.set_dma_enabled(true);
// Enable data servers.
acq_server.start_server(); acq_server.start_server();
timetagger_server.start_server(); timetagger_server.start_server();
acq_stream.set_enabled(true);
timetagger_stream.set_enabled(true);
log(LOG_INFO, "Running, press Ctrl-C to stop"); log(LOG_INFO, "Running, press Ctrl-C to stop");
io.run(); io.run();

View File

@ -715,8 +715,8 @@ public:
m_device.write_reg(m_reg.addr_end, m_buf_end); m_device.write_reg(m_reg.addr_end, m_buf_end);
m_device.write_reg(m_reg.addr_limit, m_buf_end - POINTER_MARGIN); m_device.write_reg(m_reg.addr_limit, m_buf_end - POINTER_MARGIN);
// Initialize DMA stream (reset write pointer to start of segment). // Initialize DMA stream and reset write pointer.
m_device.write_reg(m_reg.dma_ctrl, 2); init();
} }
// Delete copy constructor and assignment operator. // Delete copy constructor and assignment operator.
@ -726,8 +726,7 @@ public:
/** Disable DMA stream. Remaining data in buffer is discarded. */ /** Disable DMA stream. Remaining data in buffer is discarded. */
~DmaWriteStream() ~DmaWriteStream()
{ {
// Disable DMA stream and re-initialize. // Disable and reset DMA stream.
m_device.write_reg(m_reg.dma_ctrl, 0);
m_device.write_reg(m_reg.dma_ctrl, 2); m_device.write_reg(m_reg.dma_ctrl, 2);
// Disable DMA stream interrupt and clear pending interrupt. // Disable DMA stream interrupt and clear pending interrupt.
@ -752,6 +751,29 @@ public:
m_device.write_reg(m_reg.dma_ctrl, enable ? 1 : 0); m_device.write_reg(m_reg.dma_ctrl, enable ? 1 : 0);
} }
/** Disable DMA stream and discard all data from the buffer. */
void init()
{
// Disable DMA stream.
m_device.write_reg(m_reg.dma_ctrl, 0);
// Wait until DMA completed or timeout.
for (int i = 0; i < 16; i++) {
if ((m_device.read_reg(m_reg.dma_status) & 1) == 0) {
break;
}
}
// Reset write pointer, clear buffer.
m_device.write_reg(m_reg.dma_ctrl, 2);
// Disable and clear interrupt.
disable_interrupt();
// Reset read pointer.
m_read_pointer = m_buf_start;
}
/** Return the number of bytes available in the DMA buffer. */ /** Return the number of bytes available in the DMA buffer. */
size_t get_data_available() size_t get_data_available()
{ {