redpitaya-puzzlefw/os/src/userspace/data_server.hpp

597 lines
18 KiB
C++

/*
* data_server.hpp
*
* Transmit DMA data via TCP server socket.
*
* Joris van Rantwijk 2024
*/
#ifndef PUZZLEFW_DATA_SERVER_H_
#define PUZZLEFW_DATA_SERVER_H_
#include <assert.h>
#include <stdint.h>
#include <chrono>
#include <iomanip>
#include <stdexcept>
#include <system_error>
#include <thread>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include "logging.hpp"
#include "puzzlefw.hpp"
namespace puzzlefw {
namespace asio = boost::asio;
/**
* Send DMA data via a TCP server socket.
*/
class DataServer
{
public:
/** Maximum block size per TCP send() call. */
static constexpr size_t SEND_MAX_BLOCK = 65536;
/** Sleep until this number of bytes is available in the buffer. */
static constexpr size_t WAIT_BLOCK_SIZE = 4096;
/** Sleep at most this duration if there is insufficient data. */
static constexpr std::chrono::duration WAIT_TIMEOUT =
std::chrono::milliseconds(10);
/** Constructor. */
DataServer(
asio::io_context& io,
DmaWriteStream& dma_stream,
uint16_t tcp_port)
: m_strand(asio::make_strand(io))
, m_acceptor(m_strand)
, m_connection(m_strand)
, m_timer(m_strand)
, m_dma_stream(dma_stream)
, m_tcp_port(tcp_port)
, m_stale_receive(false)
, m_stale_send(false)
, m_send_in_progress(false)
{
if (dma_stream.buffer_segment_size() < 2 * WAIT_BLOCK_SIZE
|| SEND_MAX_BLOCK % dma_stream.dma_alignment() != 0
|| WAIT_BLOCK_SIZE % dma_stream.dma_alignment() != 0) {
throw std::invalid_argument("Invalid buffer segment size");
}
}
// Delete copy constructor and assignment operator.
DataServer(const DataServer&) = delete;
DataServer& operator=(const DataServer&) = delete;
/** Destructor. */
~DataServer()
{
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Disable interrupts.
m_dma_stream.disable_interrupt();
}
/**
* Start the server.
*
* This method must not be called directly while a multi-threaded Asio
* event loop is running. In that case, use "async_start_server()".
*/
void start_server()
{
// If the server is already open, close and re-open it.
if (m_acceptor.is_open()) {
m_acceptor.close();
}
// Open IPv6 TCP socket.
m_acceptor.open(asio::ip::tcp::v6());
// Disable IPV6_V6ONLY to allow IPv4 connections.
m_acceptor.set_option(asio::ip::v6_only(false));
// Enable SO_REUSEADDR.
m_acceptor.set_option(asio::socket_base::reuse_address(true));
// Bind to the TCP port on all interfaces.
asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port);
m_acceptor.bind(addr);
// Start listening for connections.
m_acceptor.listen();
// Asynchronously accept connections.
m_acceptor.async_accept(
[this](auto& ec, auto s){ handle_accept(ec, std::move(s)); });
log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port);
}
/**
* Stop the server and close current connections.
*
* This method must not be called directly while a multi-threaded Asio
* event loop is running. In that case, use "async_stop_server()".
*/
void stop_server()
{
// Disable DMA stream and clear buffer.
m_dma_stream.init();
// Stop accepting connections.
if (m_acceptor.is_open()) {
log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port);
m_acceptor.close();
}
// Close the current connection.
if (m_connection.is_open()) {
log(LOG_INFO, "Closing connection to port %d", m_tcp_port);
m_connection.close();
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
}
/**
* Submit a call to "start_server()" to be executed in the strand
* of this server instance.
*
* After starting the server, the specified completion handler will
* be submitted for execution.
*
* This method may safely be called from any thread.
*/
template <typename Handler>
void async_start_server(Handler&& handler)
{
asio::post(
m_strand,
[this, handler] () mutable {
start_server();
asio::post(m_strand, handler);
});
}
/**
* Submit a call to "stop_server()" to be executed in the strand
* of this server instance.
*
* After stopping the server, the specified completion handler will
* be submitted for execution.
*
* This method may safely be called from any thread.
*/
template <typename Handler>
void async_stop_server(Handler&& handler)
{
asio::post(
m_strand,
[this, handler] () mutable {
stop_server();
asio::post(m_strand, handler);
});
}
/**
* Called when an FPGA interrupt occurs.
*
* The interrupt may or may not be related to our DMA stream.
* Before returning, this function must disable and clear any pending
* interrupt for the DMA stream.
*
* This function may safely be called from any thread.
*/
void handle_interrupt()
{
if (m_dma_stream.is_interrupt_pending()) {
m_dma_stream.disable_interrupt();
asio::post(m_strand,
[this](){ handle_interrupt_in_strand(); });
}
}
private:
/**
* Re-initialize DMA stream and discard stale data from the DMA buffer.
*
* Do not call this while an async_send() operation is in progress.
*/
void discard_stale_data()
{
// Re-init DMA and clear buffer.
m_dma_stream.init();
// The DMA buffer and the internal RAM buffer in the FPGA are now empty.
// But the front-end logic in the FPGA may still contain some stale
// data as well as an overflow record. These will flow into the RAM
// buffer within a microsecond. Let's wait for that.
// Sleeping in an Asio handler is bad, but 1 microsecond is so short
// that it won't hurt.
std::this_thread::sleep_for(std::chrono::microseconds(1));
// Assuming no new data is being produced, the front-end buffer is
// now clean and overflow cleared. A little data may have moved into
// the FPGA RAM buffer. Re-init DMA again to discard that.
m_dma_stream.init();
}
/** Accept completion handler. */
void handle_accept(const boost::system::error_code& error,
asio::ip::tcp::socket conn)
{
if (error) {
// Ignore error due to cancellation.
if (error == asio::error::operation_aborted) {
return;
}
// Certain errors can be triggered by network conditions.
// In these cases, we should retry the accept call.
if (error == asio::error::broken_pipe
|| error == asio::error::connection_aborted
|| error == asio::error::connection_reset
|| error == asio::error::host_unreachable
|| error == asio::error::network_down
|| error == asio::error::network_reset
|| error == asio::error::network_unreachable
|| error == asio::error::timed_out) {
log(LOG_ERROR,
"Accept failed for port %d (%s), retrying",
m_tcp_port,
error.message().c_str());
// Retry accept call.
if (m_acceptor.is_open()) {
m_acceptor.async_accept(
[this](auto& ec, auto s) {
handle_accept(ec, std::move(s));
});
}
return;
}
// Raise exception on unexpected error.
throw std::system_error(error);
}
if (! m_acceptor.is_open()) {
// Oops we were not supposed to accept new connections.
// Apparently this connection sneaked right through before
// closing the acceptor socket.
// Drop the new connection.
log(LOG_INFO, "Dropping new connection to port %d", m_tcp_port);
conn.close();
return;
}
if (m_acceptor.is_open()) {
// Continue accepting connections.
m_acceptor.async_accept(
[this](auto& ec, auto s){ handle_accept(ec, std::move(s)); });
}
if (m_connection.is_open()) {
// We already had an active client connection.
// Drop the old connection and switch to the new client.
log(LOG_INFO,
"Closing current connection to port %d", m_tcp_port);
m_connection.close();
m_stale_receive = true;
m_stale_send = m_send_in_progress;
}
log(LOG_INFO, "New connection to port %d", m_tcp_port);
m_connection = std::move(conn);
if (! m_stale_receive) {
// Setup async receive to detect when the connection is
// closed remotely (or client writes unexpected data).
m_connection.async_receive(
asio::buffer(m_receive_buf, sizeof(m_receive_buf)),
[this](auto& ec, size_t n){ handle_receive(ec, n); });
}
// Clear buffer, then enable DMA stream.
discard_stale_data();
m_dma_stream.set_enabled(true);
// Prepare to send data.
transmit_data(false);
}
/** Receive completion handler. */
void handle_receive(const boost::system::error_code& error, size_t len)
{
if (m_stale_receive) {
// This completion refers to an old, already closed connection.
m_stale_receive = false;
if (m_connection.is_open()) {
// Initiate async receive for the new connection.
m_connection.async_receive(
asio::buffer(m_receive_buf, sizeof(m_receive_buf)),
[this](auto& ec, size_t n){ handle_receive(ec, n); });
}
return;
}
// Either the connection was closed remotely, or a network error
// occurred, or the remote side sent us unexpected data.
// In all of these cases, this connection must be closed.
// Report what happened.
if (error) {
if (error == asio::error::eof) {
log(LOG_INFO,
"Connection to port %d closed by remote", m_tcp_port);
} else {
log(LOG_ERROR,
"Receive failed on port %d (%s), closing connection",
m_tcp_port,
error.message().c_str());
}
} else {
log(LOG_ERROR,
"Received unexpected data on port %d, closing connection",
m_tcp_port);
}
// Close connection.
if (m_connection.is_open()) {
m_connection.close();
m_stale_send = m_send_in_progress;
}
}
/** Send completion handler. */
void handle_send(const boost::system::error_code& error, size_t len)
{
assert(m_send_in_progress);
assert(m_send_size > 0);
m_send_in_progress = false;
if (m_stale_send) {
// This completion refers to an old, already closed connection.
m_stale_send = false;
return;
}
if (error) {
// Report error.
if (error == asio::error::broken_pipe
|| error == asio::error::connection_reset) {
log(LOG_INFO,
"Connection to port %d closed by remote", m_tcp_port);
} else {
log(LOG_ERROR,
"Send failed on port %d (%s), closing connection",
m_tcp_port,
error.message().c_str());
}
// Close the connection.
if (m_connection.is_open()) {
m_connection.close();
m_stale_receive = true;
}
return;
}
if (len < m_send_buffer.size()) {
// Partially completed. Send the rest.
m_send_buffer += len;
m_send_in_progress = true;
m_connection.async_send(
m_send_buffer,
[this](auto& ec, size_t n){ handle_send(ec, n); });
} else {
// Fully completed.
// Release the completed part of the DMA buffer.
m_dma_stream.consume_data(m_send_size);
// Try to send more data.
transmit_data(false);
}
}
/**
* Try to send data from the DMA buffer to the TCP connection.
*
* If insufficient data is available in the buffer, setup
* to be notified when there is sufficient data.
*/
void transmit_data(bool skip_waiting)
{
if (m_send_in_progress) {
// Send already in progress. Do nothing until it completes.
return;
}
if (! m_connection.is_open()) {
// No connection. Do nothing until we get a new connection.
return;
}
// Check amount of data available.
size_t data_available = m_dma_stream.get_data_available();
if (data_available == 0 ||
(data_available < WAIT_BLOCK_SIZE && (! skip_waiting))) {
// Insufficient data available. Setup interrupt.
m_dma_stream.enable_interrupt_on_data_available(WAIT_BLOCK_SIZE);
// Double-check if data are already available.
// This is necessary to prevent a race condition where the data
// becomes available just before the interrupt is enabled.
data_available = m_dma_stream.get_data_available();
if (data_available < WAIT_BLOCK_SIZE) {
// Setup timeout in case interrupt takes too long.
// If timeout occurs, we will send whatever data is available.
m_timer.expires_after(WAIT_TIMEOUT);
m_timer.async_wait(
[this](auto& ec){ handle_timer(ec); });
// Done. We will be notified for interrupt or timeout.
return;
} else {
// We have enough data after all. Cancel the interrupt.
m_dma_stream.disable_interrupt();
}
}
// Get a continuous data block.
void *data;
m_dma_stream.get_data_block(data, data_available);
// Initiate async send.
// Limit the block size so we can release that part of the buffer
// as soon as it completes.
m_send_size = std::min(data_available, SEND_MAX_BLOCK);
m_send_buffer = asio::buffer(data, m_send_size);
m_send_in_progress = true;
m_connection.async_send(
m_send_buffer,
[this](auto& ec, size_t n){ handle_send(ec, n); });
}
/** Timeout handler. */
void handle_timer(const boost::system::error_code& error)
{
if (error) {
// Ignore error due to cancellation.
if (error == asio::error::operation_aborted) {
return;
}
// Raise exception on unexpected error.
throw std::system_error(error);
}
// We get here if a timeout occurs while waiting for an interrupt.
// Disable the interrupt because we no longer care about it.
m_dma_stream.disable_interrupt();
// Try to send some data.
transmit_data(true);
}
/**
* Called when an FPGA interrupt occurs for our DMA stream.
*
* This function runs in the strand of this instance.
*/
void handle_interrupt_in_strand()
{
// Cancel the interrupt timeout.
m_timer.cancel();
// Try to send some data.
transmit_data(true);
}
asio::strand<asio::io_context::executor_type> m_strand;
asio::ip::tcp::acceptor m_acceptor;
asio::ip::tcp::socket m_connection;
asio::steady_timer m_timer;
DmaWriteStream& m_dma_stream;
const uint16_t m_tcp_port;
char m_receive_buf[1];
bool m_stale_receive;
bool m_stale_send;
bool m_send_in_progress;
size_t m_send_size;
asio::const_buffer m_send_buffer;
};
/**
* Monitor DMA status and throw exception if an error occurs.
*/
class DmaErrorMonitor
{
public:
/** Constructor. */
DmaErrorMonitor(
asio::io_context& io,
PuzzleFwDevice& device,
const std::chrono::milliseconds interval)
: m_timer(io)
, m_device(device)
, m_interval(interval)
{
m_timer.expires_after(m_interval);
m_timer.async_wait([this](auto& ec){ handle_timer(ec); });
}
// Delete copy constructor and assignment operator.
DmaErrorMonitor(const DmaErrorMonitor&) = delete;
DmaErrorMonitor& operator=(const DmaErrorMonitor&) = delete;
/**
* Check whether DMA errors occurred.
*
* Throws std::runtime_error() if a DMA error is pending.
*/
void check_dma_error()
{
uint32_t dma_status = m_device.get_dma_status();
if ((dma_status & 0x1e) != 0) {
std::stringstream msg;
msg << "DMA error: status=0x"
<< std::setfill('0') << std::setw(2) << std::hex
<< dma_status;
throw std::runtime_error(msg.str());
}
}
private:
/** Timeout handler. */
void handle_timer(const boost::system::error_code& error)
{
if (error) {
// Ignore error due to cancellation.
if (error == asio::error::operation_aborted) {
return;
}
// Raise exception on unexpected error.
throw std::system_error(error);
}
m_timer.expires_after(m_interval);
m_timer.async_wait([this](auto& ec){ handle_timer(ec); });
check_dma_error();
}
asio::steady_timer m_timer;
PuzzleFwDevice& m_device;
std::chrono::milliseconds m_interval;
};
} // namespace puzzlefw
#endif // PUZZLEFW_DATA_SERVER_H_