From ea5d3c3a1d78ca2610f2a75784d0b37f79dee526 Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Fri, 27 Sep 2024 19:30:52 +0200 Subject: [PATCH] Start working on remote control server --- sw/.gitignore | 1 + sw/src/userspace/Makefile | 12 +- sw/src/userspace/data_server.hpp | 114 +-- sw/src/userspace/interrupt_manager.hpp | 4 +- sw/src/userspace/puzzlecmd.cpp | 4 +- sw/src/userspace/remotectl.cpp | 1096 ++++++++++++++++++++++++ sw/src/userspace/version.hpp | 2 + 7 files changed, 1146 insertions(+), 87 deletions(-) create mode 100644 sw/src/userspace/remotectl.cpp create mode 100644 sw/src/userspace/version.hpp diff --git a/sw/.gitignore b/sw/.gitignore index 5a28233..a40f733 100644 --- a/sw/.gitignore +++ b/sw/.gitignore @@ -1,5 +1,6 @@ devicetree/devicetree.dtb buildroot_overlay/opt/puzzlefw/bin/puzzlecmd +buildroot_overlay/opt/puzzlefw/bin/remotectl buildroot_overlay/opt/puzzlefw/driver/ downloads/ buildroot-2023.02.8/ diff --git a/sw/src/userspace/Makefile b/sw/src/userspace/Makefile index 2fb39d7..adbd31a 100644 --- a/sw/src/userspace/Makefile +++ b/sw/src/userspace/Makefile @@ -8,17 +8,19 @@ CFLAGS = -Wall -O2 CXXFLAGS = -std=c++17 -Wall -Wno-psabi -O2 .PHONY: all -all: puzzlecmd +all: puzzlecmd remotectl puzzlecmd: puzzlecmd.o puzzlefw.o $(CXX) -o $@ $(LDFLAGS) $^ -puzzlecmd.o: puzzlecmd.cpp logging.hpp puzzlefw.hpp interrupt_manager.hpp data_server.hpp -puzzlefw.o: puzzlefw.cpp puzzlefw.hpp +remotectl: remotectl.o puzzlefw.o + $(CXX) -o $@ $(LDFLAGS) $^ -testje: testje.c +puzzlecmd.o: puzzlecmd.cpp logging.hpp puzzlefw.hpp interrupt_manager.hpp data_server.hpp +remotectl.o: remotectl.cpp logging.hpp puzzlefw.hpp interrupt_manager.hpp data_server.hpp version.hpp +puzzlefw.o: puzzlefw.cpp puzzlefw.hpp .PHONY: clean clean: - $(RM) -f -- puzzlecmd testje *.o + $(RM) -f -- puzzlecmd remotectl *.o diff --git a/sw/src/userspace/data_server.hpp b/sw/src/userspace/data_server.hpp index caa8813..0630bb5 100644 --- a/sw/src/userspace/data_server.hpp +++ b/sw/src/userspace/data_server.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -82,11 +83,17 @@ public: m_dma_stream.disable_interrupt(); } + /** Return the Asio strand that runs all handlers for this object. */ + asio::strand get_executor() + { + return m_strand; + } + /** * Start the server. * - * This method must not be called directly while a multi-threaded Asio - * event loop is running. In that case, use "async_start_server()". + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". */ void start_server() { @@ -113,7 +120,7 @@ public: // Asynchronously accept connections. m_acceptor.async_accept( - [this](auto& ec, auto s){ handle_accept(ec, std::move(s)); }); + [this](auto ec, auto s){ handle_accept(ec, std::move(s)); }); log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port); } @@ -121,8 +128,8 @@ public: /** * Stop the server and close current connections. * - * This method must not be called directly while a multi-threaded Asio - * event loop is running. In that case, use "async_stop_server()". + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". */ void stop_server() { @@ -144,46 +151,6 @@ public: } } - /** - * Submit a call to "start_server()" to be executed in the strand - * of this server instance. - * - * After starting the server, the specified completion handler will - * be submitted for execution. - * - * This method may safely be called from any thread. - */ - template - void async_start_server(Handler&& handler) - { - asio::post( - m_strand, - [this, handler] () mutable { - start_server(); - asio::post(m_strand, handler); - }); - } - - /** - * Submit a call to "stop_server()" to be executed in the strand - * of this server instance. - * - * After stopping the server, the specified completion handler will - * be submitted for execution. - * - * This method may safely be called from any thread. - */ - template - void async_stop_server(Handler&& handler) - { - asio::post( - m_strand, - [this, handler] () mutable { - stop_server(); - asio::post(m_strand, handler); - }); - } - /** * Called when an FPGA interrupt occurs. * @@ -206,7 +173,7 @@ private: /** * Re-initialize DMA stream and discard stale data from the DMA buffer. * - * Do not call this while an async_send() operation is in progress. + * Do not call this while an async_write() operation is in progress. */ void discard_stale_data() { @@ -228,7 +195,7 @@ private: } /** Accept completion handler. */ - void handle_accept(const boost::system::error_code& error, + void handle_accept(boost::system::error_code error, asio::ip::tcp::socket conn) { if (error) { @@ -256,7 +223,7 @@ private: // Retry accept call. if (m_acceptor.is_open()) { m_acceptor.async_accept( - [this](auto& ec, auto s) { + [this](auto ec, auto s) { handle_accept(ec, std::move(s)); }); } @@ -281,7 +248,7 @@ private: if (m_acceptor.is_open()) { // Continue accepting connections. m_acceptor.async_accept( - [this](auto& ec, auto s){ handle_accept(ec, std::move(s)); }); + [this](auto ec, auto s){ handle_accept(ec, std::move(s)); }); } if (m_connection.is_open()) { @@ -302,7 +269,7 @@ private: // closed remotely (or client writes unexpected data). m_connection.async_receive( asio::buffer(m_receive_buf, sizeof(m_receive_buf)), - [this](auto& ec, size_t n){ handle_receive(ec, n); }); + [this](auto ec, size_t n){ handle_receive(ec, n); }); } // Clear buffer, then enable DMA stream. @@ -314,7 +281,7 @@ private: } /** Receive completion handler. */ - void handle_receive(const boost::system::error_code& error, size_t len) + void handle_receive(boost::system::error_code error, size_t len) { if (m_stale_receive) { // This completion refers to an old, already closed connection. @@ -324,7 +291,7 @@ private: // Initiate async receive for the new connection. m_connection.async_receive( asio::buffer(m_receive_buf, sizeof(m_receive_buf)), - [this](auto& ec, size_t n){ handle_receive(ec, n); }); + [this](auto ec, size_t n){ handle_receive(ec, n); }); } return; @@ -359,10 +326,9 @@ private: } /** Send completion handler. */ - void handle_send(const boost::system::error_code& error, size_t len) + void handle_send(boost::system::error_code error, size_t len) { assert(m_send_in_progress); - assert(m_send_size > 0); m_send_in_progress = false; @@ -394,21 +360,13 @@ private: return; } - if (len < m_send_buffer.size()) { - // Partially completed. Send the rest. - m_send_buffer += len; - m_send_in_progress = true; - m_connection.async_send( - m_send_buffer, - [this](auto& ec, size_t n){ handle_send(ec, n); }); - } else { - // Fully completed. - // Release the completed part of the DMA buffer. - m_dma_stream.consume_data(m_send_size); + // Send operation completed. + // Release the completed part of the DMA buffer. + assert(len == m_send_buffer.size()); + m_dma_stream.consume_data(m_send_buffer.size()); - // Try to send more data. - transmit_data(false); - } + // Try to send more data. + transmit_data(false); } /** @@ -447,7 +405,7 @@ private: // If timeout occurs, we will send whatever data is available. m_timer.expires_after(WAIT_TIMEOUT); m_timer.async_wait( - [this](auto& ec){ handle_timer(ec); }); + [this](auto ec){ handle_timer(ec); }); // Done. We will be notified for interrupt or timeout. return; @@ -467,16 +425,15 @@ private: // Initiate async send. // Limit the block size so we can release that part of the buffer // as soon as it completes. - m_send_size = std::min(data_available, SEND_MAX_BLOCK); - m_send_buffer = asio::buffer(data, m_send_size); + size_t block_size = std::min(data_available, SEND_MAX_BLOCK); + m_send_buffer = asio::buffer(data, block_size); m_send_in_progress = true; - m_connection.async_send( - m_send_buffer, - [this](auto& ec, size_t n){ handle_send(ec, n); }); + asio::async_write(m_connection, m_send_buffer, + [this](auto ec, size_t n){ handle_send(ec, n); }); } /** Timeout handler. */ - void handle_timer(const boost::system::error_code& error) + void handle_timer(boost::system::error_code error) { if (error) { // Ignore error due to cancellation. @@ -520,7 +477,6 @@ private: bool m_stale_receive; bool m_stale_send; bool m_send_in_progress; - size_t m_send_size; asio::const_buffer m_send_buffer; }; @@ -541,7 +497,7 @@ public: , m_interval(interval) { m_timer.expires_after(m_interval); - m_timer.async_wait([this](auto& ec){ handle_timer(ec); }); + m_timer.async_wait([this](auto ec){ handle_timer(ec); }); } // Delete copy constructor and assignment operator. @@ -567,7 +523,7 @@ public: private: /** Timeout handler. */ - void handle_timer(const boost::system::error_code& error) + void handle_timer(boost::system::error_code error) { if (error) { // Ignore error due to cancellation. @@ -580,7 +536,7 @@ private: } m_timer.expires_after(m_interval); - m_timer.async_wait([this](auto& ec){ handle_timer(ec); }); + m_timer.async_wait([this](auto ec){ handle_timer(ec); }); check_dma_error(); } diff --git a/sw/src/userspace/interrupt_manager.hpp b/sw/src/userspace/interrupt_manager.hpp index 49f9449..9ef8506 100644 --- a/sw/src/userspace/interrupt_manager.hpp +++ b/sw/src/userspace/interrupt_manager.hpp @@ -74,7 +74,7 @@ public: private: /** Called when an FPGA interrupt occurs. */ - void handle_wait(const boost::system::error_code& error) + void handle_wait(boost::system::error_code error) { if (error) { // Ignore error due to cancellation. @@ -100,7 +100,7 @@ private: // Wait for next interrupt. m_uio_fd.async_wait(asio::posix::descriptor_base::wait_read, - [this](auto& ec){ handle_wait(ec); }); + [this](auto ec){ handle_wait(ec); }); } PuzzleFwDevice& m_device; diff --git a/sw/src/userspace/puzzlecmd.cpp b/sw/src/userspace/puzzlecmd.cpp index 2704aeb..6e08be4 100644 --- a/sw/src/userspace/puzzlecmd.cpp +++ b/sw/src/userspace/puzzlecmd.cpp @@ -2,6 +2,8 @@ * puzzlecmd.cpp * * Command-line program to test PuzzleFW firmware. + * + * Joris van Rantwijk 2024 */ #include @@ -120,7 +122,7 @@ void run_data_server(puzzlefw::PuzzleFwDevice& device) // Catch Ctrl-C for controlled shut down. asio::signal_set signals(io, SIGINT); signals.async_wait( - [&io](auto& ec, int sig) { + [&io](auto ec, int sig) { log(LOG_INFO, "Got SIGINT, stopping server"); io.stop(); }); diff --git a/sw/src/userspace/remotectl.cpp b/sw/src/userspace/remotectl.cpp new file mode 100644 index 0000000..784ed39 --- /dev/null +++ b/sw/src/userspace/remotectl.cpp @@ -0,0 +1,1096 @@ +/* + * remotectl.cpp + * + * Remote control server. + * + * Joris van Rantwijk 2024 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "puzzlefw.hpp" +#include "logging.hpp" +#include "interrupt_manager.hpp" +#include "data_server.hpp" +#include "version.hpp" + +using namespace puzzlefw; +namespace asio = boost::asio; + + +/** Return true if the string ends with the suffix. */ +bool str_ends_with(const std::string& value, const std::string& suffix) +{ + return (value.size() >= suffix.size()) && + std::equal(value.end() - suffix.size(), value.end(), suffix.begin()); +} + + +/** Return lower-case copy of string. */ +std::string str_to_lower(const std::string& value) +{ + std::string result = value; + for (char& c : result) { + c = tolower(c); + } + return result; +} + + +// Forward declaration. +class ControlServer; + + +/** + * The CommandHandler handles commands from remote clients. + */ +class CommandHandler +{ +public: + // IDN response fields + static constexpr std::string_view IDN_MANUFACTURER = "Jigsaw"; + static constexpr std::string_view IDN_MODEL = "PuzzleFw"; + + enum ExitStatus { + EXIT_ERROR = 1, + EXIT_HALT = 2, + EXIT_REBOOT = 3 + }; + + enum RangeSpec { + RANGE_NONE = 0, + RANGE_LO = 1, + RANGE_HI = 2 + }; + + struct CommandEnvironment { + int channel; + RangeSpec range_spec; + bool raw_flag; + bool saved_flag; + }; + + /** Constructor. */ + CommandHandler(asio::io_context& io, + PuzzleFwDevice& device, + const std::string& serial_number) + : m_io(io) + , m_strand(asio::make_strand(io)) + , m_device(device) + , m_serial_number(serial_number) + , m_control_server(nullptr) + , m_shutting_down(false) + , m_exit_status(EXIT_ERROR) + { } + + // Delete copy constructor and assignment operator. + CommandHandler(const CommandHandler&) = delete; + CommandHandler& operator=(const CommandHandler&) = delete; + + /** Register the control server with the command handler. */ + void set_control_server(ControlServer& control_server) + { + m_control_server = &control_server; + } + + /** Register a data server with the command handler. */ + void add_data_server(DataServer& data_server) + { + m_data_servers.push_back(&data_server); + } + + /** Return the exit status of the command handler. */ + ExitStatus exit_status() const + { + return m_exit_status; + } + + /** Return the Asio strand that runs all handlers for this object. */ + asio::strand get_executor() + { + return m_strand; + } + + /** + * Handle a command. + * + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". + * + * Returns: + * Response without line terminator, + * or an empty string if no response must be sent. + */ + std::string handle_command(const std::string& command) + { + if (m_shutting_down) { + // The server is shutting down. + // Ignore new commands and return an empty string to indicate + // that no response must be sent. + return std::string(); + } + + // Split words. + std::vector tokens = parse_command(command); + + // Ignore empty command line without response. + if (tokens.empty()) { + return std::string(); + } + + // Convert command to lower case. + std::string action = str_to_lower(tokens.front()); + + CommandEnvironment env; + + // Extract channel specifier. + env.channel = 0; + if (action.size() > 7 + && action.compare(0, 6, "ain:ch") == 0 + && action[7] == ':') { + char channel_digit = action[6]; + if (channel_digit < '1' || channel_digit > '4') { + return err_unknown_command(); + } + env.channel = channel_digit - '1'; // 0-based channel index + action[7] = 'N'; // mark channel index + } + + // Extract range specifier. + env.range_spec = RANGE_NONE; + { + size_t p = action.rfind(":lo"); + if (p != action.npos + && (p + 3 == action.size() || action[p+3] == '?')) { + env.range_spec = RANGE_LO; + action[p + 1] = 'R'; // mark range specifier + action[p + 2] = 'R'; + } else { + p = action.rfind(":hi"); + if (p != action.npos + && (p + 3 == action.size() || action[p+3] == '?')) { + env.range_spec = RANGE_HI; + action[p + 1] = 'R'; // mark range specifier + action[p + 2] = 'R'; + } + } + } + + // Detect :RAW flag. + env.raw_flag = str_ends_with(action, ":raw?"); + + // Detect :SAVED flag. + env.saved_flag = str_ends_with(action, ":saved") + || str_ends_with(action, ":saved?"); + + // Handle commands without argument. + { + const auto it = command_table_no_args.find(action); + if (it != command_table_no_args.end()) { + if (tokens.size() != 1) { + return err_unexpected_argument(); + } + auto func = it->second; + return (this->*func)(env); + } + } + + // Handle commands with one argument. + { + const auto it = command_table_one_arg.find(action); + if (it != command_table_one_arg.end()) { + if (tokens.size() < 2) { + return err_missing_argument(); + } + if (tokens.size() > 2) { + return err_unexpected_argument(); + } + auto func = it->second; + return (this->*func)(env, tokens[1]); + } + } + + // Handle command IPCFG. + if (action == "ipcfg" || action == "ipcfg:saved") { + if (tokens.size() < 3) { + return err_missing_argument(); + } + if (tokens.size() > 4) { + return err_unexpected_argument(); + } + const std::string& gw = (tokens.size() > 3) ? tokens[3] : ""; + return cmd_ipcfg(env, tokens[1], tokens[2], gw); + } + + return err_unknown_command(); + } + +private: + // TODO -- shutdown flow + + std::string err_unknown_command() const + { + return "ERROR Unknown command"; + } + + std::string err_unexpected_argument() const + { + return "ERROR Unexpected argument"; + } + + std::string err_missing_argument() const + { + return "ERROR Missing argument"; + } + + /** Parse command to a list of white space separated tokens. */ + std::vector parse_command(const std::string& command) + { + static constexpr std::string_view space_chars = " \t\n\v\f\r"; + std::vector tokens; + + for (size_t p = 0; p < command.size(); ) { + + p = command.find_first_not_of(space_chars, p); + if (p == command.npos) { + break; + } + + size_t q = command.find_first_of(space_chars, p); + if (q == command.npos) { + q = command.size(); + } + tokens.emplace_back(command, p, q - p); + + p = q; + } + + return tokens; + } + + /** Handle command *IDN? */ + std::string qry_idn(CommandEnvironment env) + { + VersionInfo fw_version = m_device.get_version_info(); + std::stringstream idn; + idn << IDN_MANUFACTURER; + idn << "," << IDN_MODEL; + idn << "," << m_serial_number; + idn << ","; + idn << "FW-" << (int)fw_version.major_version; + idn << "." << (int)fw_version.minor_version; + idn << "/SW-" << PUZZLEFW_SW_MAJOR << "." << PUZZLEFW_SW_MINOR; + return idn.str(); + } + + /** Handle command TIMESTAMP? */ + std::string qry_timestamp(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHANNELS:COUNT? */ + std::string qry_channels_count(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHANNELS:ACTIVE? */ + std::string qry_channels_active(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:RANGE? */ + std::string qry_channel_range(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:OFFS[:range]? */ + std::string qry_channel_offs(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:GAIN[:range]? */ + std::string qry_channel_gain(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:SAMPLE[:RAW]? */ + std::string qry_channel_sample(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:MINMAX[:RAW]? */ + std::string qry_channel_minmax(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE? */ + std::string qry_srate(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE:DIVISOR? */ + std::string qry_srate_divisor(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE:MODE? */ + std::string qry_srate_mode(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE:GAIN? */ + std::string qry_srate_gain(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:NSAMPLES? */ + std::string qry_nsamples(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:TRIGGER:MODE? */ + std::string qry_trigger_mode(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:TRIGGER:DELAY? */ + std::string qry_trigger_delay(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:TRIGGER:STATUS? */ + std::string qry_trigger_status(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command TT:SAMPLE? */ + std::string qry_tt_sample(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command TT:CHANNEL:MASK? */ + std::string qry_tt_channel_mask(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command IPCFG[:SAVED]? */ + std::string qry_ipcfg(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command RESET */ + std::string cmd_reset(CommandEnvironment env) + { + // TODO -- reset + return "OK"; + } + + /** Handle command HALT */ + std::string cmd_halt(CommandEnvironment env) + { + log(LOG_INFO, "Got command HALT"); + + m_exit_status = EXIT_HALT; + m_shutting_down = true; + m_io.stop(); + + // No response while shutting down. + // Response delivery would not be reliable while the socket is closing. + return std::string(); + } + + /** Handle command REBOOT */ + std::string cmd_reboot(CommandEnvironment env) + { + log(LOG_INFO, "Got command REBOOT"); + + m_exit_status = EXIT_REBOOT; + m_shutting_down = true; + m_io.stop(); + + // No response while shutting down. + // Response delivery would not be reliable while the socket is closing. + return std::string(); + } + + /** Handle command AIN:MINMAX:CLEAR */ + std::string cmd_minmax_clear(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command TT:MARK */ + std::string cmd_tt_mark(CommandEnvironment env) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHANNELS:ACTIVE */ + std::string cmd_channels_active(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:RANGE */ + std::string cmd_channel_range(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:OFFS[:range] */ + std::string cmd_channel_offs(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:CHn:GAIN[:range] */ + std::string cmd_channel_gain(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE */ + std::string cmd_srate(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE:DIVISOR */ + std::string cmd_srate_divisor(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:SRATE:MODE */ + std::string cmd_srate_mode(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:NSAMPLES */ + std::string cmd_nsamples(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:TRIGGER:MODE */ + std::string cmd_trigger_mode(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command AIN:TRIGGER:DELAY */ + std::string cmd_trigger_delay(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command TT:CHANNEL:MASK */ + std::string cmd_tt_channel_mask(CommandEnvironment env, + const std::string& arg) + { + // TODO + return "ERROR"; + } + + /** Handle command IPCFG */ + std::string cmd_ipcfg(CommandEnvironment env, + const std::string& ipaddr, + const std::string& netmask, + const std::string& gateway) + { + // TODO + return "OK"; + } + + static const inline std::map< + std::string, + std::string (CommandHandler::*)(CommandEnvironment)> + command_table_no_args = { + { "*idn?", &CommandHandler::qry_idn }, + { "timestamp?", &CommandHandler::qry_timestamp }, + { "ain:channels:count?", &CommandHandler::qry_channels_count }, + { "ain:channels:active?", &CommandHandler::qry_channels_active }, + { "ain:chN:range?", &CommandHandler::qry_channel_range }, + { "ain:chN:offs?", &CommandHandler::qry_channel_offs }, + { "ain:chN:offs:RR?", &CommandHandler::qry_channel_offs }, + { "ain:chN:gain?", &CommandHandler::qry_channel_gain }, + { "ain:chN:gain:RR?", &CommandHandler::qry_channel_gain }, + { "ain:chN:sample?", &CommandHandler::qry_channel_sample }, + { "ain:chN:sample:raw?", &CommandHandler::qry_channel_sample }, + { "ain:chN:minman?", &CommandHandler::qry_channel_minmax }, + { "ain:chN:manmax:raw?", &CommandHandler::qry_channel_minmax }, + { "ain:srate?", &CommandHandler::qry_srate }, + { "ain:srate:divisor?", &CommandHandler::qry_srate_divisor }, + { "ain:srate:mode?", &CommandHandler::qry_srate_mode }, + { "ain:srate:gain?", &CommandHandler::qry_srate_gain }, + { "ain:nsamples?", &CommandHandler::qry_nsamples }, + { "ain:trigger:mode?", &CommandHandler::qry_trigger_mode }, + { "ain:trigger:delay?", &CommandHandler::qry_trigger_delay }, + { "ain:trigger:status?", &CommandHandler::qry_trigger_status }, + { "tt:sample?", &CommandHandler::qry_tt_sample }, + { "tt:channel:mask?", &CommandHandler::qry_tt_channel_mask }, + { "ipcfg?", &CommandHandler::qry_ipcfg }, + { "ipcfg:saved?", &CommandHandler::qry_ipcfg }, + { "reset", &CommandHandler::cmd_reset }, + { "halt", &CommandHandler::cmd_halt }, + { "reboot", &CommandHandler::cmd_reboot }, + { "ain:minmax:clear", &CommandHandler::cmd_minmax_clear }, + { "tt:mark", &CommandHandler::cmd_tt_mark } + }; + + static const inline std::map< + std::string, + std::string (CommandHandler::*)(CommandEnvironment, const std::string&)> + command_table_one_arg = { + { "ain:channels:active", &CommandHandler::cmd_channels_active }, + { "ain:chN:range", &CommandHandler::cmd_channel_range }, + { "ain:chN:offs", &CommandHandler::cmd_channel_offs }, + { "ain:chN:offs:RR", &CommandHandler::cmd_channel_offs }, + { "ain:chN:gain", &CommandHandler::cmd_channel_gain }, + { "ain:chN:gain:RR", &CommandHandler::cmd_channel_gain }, + { "ain:srate", &CommandHandler::cmd_srate }, + { "ain:srate:divisor", &CommandHandler::cmd_srate_divisor }, + { "ain:srate:mode", &CommandHandler::cmd_srate_mode }, + { "ain:nsamples", &CommandHandler::cmd_nsamples }, + { "ain:trigger:mode", &CommandHandler::cmd_trigger_mode }, + { "ain:trigger:delay", &CommandHandler::cmd_trigger_delay }, + { "tt:channel:mask", &CommandHandler::cmd_tt_channel_mask } + }; + + asio::io_context& m_io; + asio::strand m_strand; + PuzzleFwDevice& m_device; + std::string m_serial_number; + ControlServer* m_control_server; + std::vector m_data_servers; + bool m_shutting_down; + ExitStatus m_exit_status; +}; + + +/** + * Manage TCP connections for remote control. + */ +class ControlServer +{ +public: + + /** Constructor. */ + ControlServer(asio::io_context& io, + CommandHandler& command_handler, + uint16_t tcp_port) + : m_strand(asio::make_strand(io)) + , m_acceptor(m_strand) + , m_command_handler(command_handler) + , m_tcp_port(tcp_port) + { } + + // Delete copy constructor and assignment operator. + ControlServer(const ControlServer&) = delete; + ControlServer& operator=(const ControlServer&) = delete; + + /** + * Start the server. + * + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". + */ + void start_server() + { + // If the server is already open, close and re-open it. + if (m_acceptor.is_open()) { + m_acceptor.close(); + } + + // Drop all connections. + for (auto& conn : m_connections) { + if (conn->sock.is_open()) { + conn->sock.close(); + } + } + m_connections.clear(); + + // Open IPv6 TCP socket. + m_acceptor.open(asio::ip::tcp::v6()); + + // Disable IPV6_V6ONLY to allow IPv4 connections. + m_acceptor.set_option(asio::ip::v6_only(false)); + + // Enable SO_REUSEADDR. + m_acceptor.set_option(asio::socket_base::reuse_address(true)); + + // Bind to the TCP port on all interfaces. + asio::ip::tcp::endpoint addr(asio::ip::address_v6::any(), m_tcp_port); + m_acceptor.bind(addr); + + // Start listening for connections. + m_acceptor.listen(); + + // Asynchronously accept connections. + m_acceptor.async_accept( + [this](auto ec, auto s){ handle_accept(ec, std::move(s)); }); + + log(LOG_INFO, "Ready for TCP connections to port %d", m_tcp_port); + } + + /** + * Stop the server and close all connections. + * + * If a multi-threaded Asio event loop is running, this method may only + * be called through the strand returned by "get_executor()". + */ + void stop_server() + { + // Stop accepting connections. + if (m_acceptor.is_open()) { + log(LOG_INFO, "Closing TCP server on port %d", m_tcp_port); + m_acceptor.close(); + } + + // Drop all connections. + for (auto& conn : m_connections) { + if (conn->sock.is_open()) { + conn->sock.close(); + } + } + m_connections.clear(); + } + +private: + /** + * Each active TCP connection is represented by an instance of Connection. + */ + struct Connection { + asio::ip::tcp::socket sock; + asio::streambuf recv_buf; + std::string send_buf; + + Connection(asio::ip::tcp::socket&& s) + : sock(std::move(s)), recv_buf(4096) + { } + }; + + /** Accept completion handler. */ + void handle_accept(boost::system::error_code error, + asio::ip::tcp::socket sock) + { + if (error) { + // Ignore error due to cancellation. + if (error == asio::error::operation_aborted) { + return; + } + + // Certain errors can be triggered by network conditions. + // In these cases, we should retry the accept call. + if (error == asio::error::broken_pipe + || error == asio::error::connection_aborted + || error == asio::error::connection_reset + || error == asio::error::host_unreachable + || error == asio::error::network_down + || error == asio::error::network_reset + || error == asio::error::network_unreachable + || error == asio::error::timed_out) { + + log(LOG_ERROR, + "Accept failed for port %d (%s), retrying", + m_tcp_port, + error.message().c_str()); + + // Retry accept call. + if (m_acceptor.is_open()) { + m_acceptor.async_accept( + [this](auto& ec, auto s) { + handle_accept(ec, std::move(s)); + }); + } + + return; + } + + // Raise exception on unexpected error. + throw std::system_error(error); + } + + if (! m_acceptor.is_open()) { + // Oops we were not supposed to accept new connections. + // Apparently this connection sneaked right through before + // closing the acceptor socket. + // Drop the new connection. + log(LOG_INFO, "Dropping new connection to port %d", m_tcp_port); + sock.close(); + return; + } + + if (m_acceptor.is_open()) { + // Continue accepting connections. + m_acceptor.async_accept( + [this](auto ec, auto s){ handle_accept(ec, std::move(s)); }); + } + + log(LOG_INFO, "New connection to port %d", m_tcp_port); + + // Create Connection instance. + std::shared_ptr conn = + std::make_shared(std::move(sock)); + m_connections.push_back(conn); + + // Iniate receive opereration. + receive_command(conn); + } + + /** Receive completion handler. */ + void handle_receive(std::shared_ptr conn, + boost::system::error_code error, + size_t len) + { + if (! conn->sock.is_open()) { + // This connection is already closed. Ignore further events. + return; + } + + if (error) { + // Report error. + if (error == asio::error::eof) { + log(LOG_INFO, + "Connection to port %d closed by remote", m_tcp_port); + } else { + log(LOG_ERROR, + "Receive failed on port %d (%s), closing connection", + m_tcp_port, + error.message().c_str()); + } + + // Close this connection. + conn->sock.close(); + return; + } + + // Extract a command up to newline. + std::istream is(&conn->recv_buf); + std::string command; + std::getline(is, command, '\n'); + + // Handle command. + process_command(conn, command); + } + + /** Send completion handler. */ + void handle_send(std::shared_ptr conn, + boost::system::error_code error, + size_t len) + { + if (! conn->sock.is_open()) { + // This connection is already closed. Ignore further events. + return; + } + + if (error) { + // Report error. + if (error == asio::error::broken_pipe + || error == asio::error::connection_reset) { + log(LOG_INFO, + "Connection to port %d closed by remote", m_tcp_port); + } else { + log(LOG_ERROR, + "Send failed on port %d (%s), closing connection", + m_tcp_port, + error.message().c_str()); + } + + // Close the connection. + conn->sock.close(); + return; + } + + // Discard send buffer. + assert(len == conn->send_buf.size()); + conn->send_buf.clear(); + + // Iniiate receive operation for next command. + receive_command(conn); + } + + /** Iniate asynchronous receiving of a command from the connection. */ + void receive_command(std::shared_ptr conn) + { + if (! conn->sock.is_open()) { + // The connection is already closed. Don't mess with it. + return; + } + + // Initiate receive operation. + asio::async_read_until(conn->sock, conn->recv_buf, '\n', + [this,conn](auto ec, size_t n) { handle_receive(conn, ec, n); }); + } + + /** Process a command received through a connection. */ + void process_command(std::shared_ptr conn, + const std::string& command) + { + // Post an event to the strand of the command handler. + asio::post( + m_command_handler.get_executor(), + [this,conn,command]() { + // This code runs in the command handler strand. + // Tell the command handler to run the command. + auto response = m_command_handler.handle_command(command); + + // Post the response back to our own strand. + asio::post( + m_strand, + [this,conn,response]() { + // This code runs in our own strand. + // Handle the response. + process_response(conn, response); + }); + }); + } + + /** Initiate asynchronous sending of a response to the connection. */ + void process_response(std::shared_ptr conn, + const std::string& response) + { + if (! conn->sock.is_open()) { + // The connection is already closed. Don't mess with it. + return; + } + + if (response.empty()) { + // Empty response string means don't send a response. + // Initiate a new receive operation. + receive_command(conn); + return; + } + + // Check that the send buffer is empty. + assert(conn->send_buf.empty()); + + // Put the response message into the send buffer. + conn->send_buf = response; + conn->send_buf.push_back('\n'); + + // Start asynchronous send. + asio::async_write(conn->sock, asio::buffer(conn->send_buf), + [this,conn](auto ec, size_t n) { handle_send(conn, ec, n); }); + } + + asio::strand m_strand; + asio::ip::tcp::acceptor m_acceptor; + CommandHandler& m_command_handler; + const uint16_t m_tcp_port; + std::vector> m_connections; +}; + + +/** Run remote control server. */ +int run_remote_control_server( + puzzlefw::PuzzleFwDevice& device, + const std::string& serial_number) +{ + namespace asio = boost::asio; + using namespace puzzlefw; + + asio::io_context io; + + // Catch Ctrl-C for controlled shut down. + asio::signal_set signals(io, SIGINT); + signals.async_wait( + [&io](auto& ec, int sig) { + log(LOG_INFO, "Got SIGINT, stopping server"); + io.stop(); + }); + + // Reserve 3/4 of the DMA buffer for analog acquisition data. + // Reserve 1/4 of the DMA buffer for timetagger data. + size_t acq_buf_size = 3 * 4096 * (device.dma_buffer_size() / 4096 / 4); + size_t timetagger_buf_size = device.dma_buffer_size() - acq_buf_size; + + DmaWriteStream acq_stream( + device, + DmaWriteStream::DMA_ACQ, + 0, + acq_buf_size); + DmaWriteStream timetagger_stream( + device, + DmaWriteStream::DMA_TT, + acq_buf_size, + acq_buf_size + timetagger_buf_size); + + DataServer acq_server(io, acq_stream, 5001); + DataServer timetagger_server(io, timetagger_stream, 5002); + + InterruptManager interrupt_manager(io, device); + interrupt_manager.add_callback( + [&acq_server](){ acq_server.handle_interrupt(); }); + interrupt_manager.add_callback( + [&timetagger_server](){ timetagger_server.handle_interrupt(); }); + + DmaErrorMonitor dma_error_monitor( + io, + device, + std::chrono::milliseconds(100)); + + CommandHandler command_handler(io, device, serial_number); + ControlServer control_server(io, command_handler, 5025); + + command_handler.set_control_server(control_server); + command_handler.add_data_server(acq_server); + command_handler.add_data_server(timetagger_server); + + // Disable DMA engine on exit from this function. + struct ScopeGuard { + PuzzleFwDevice& m_device; + ScopeGuard(PuzzleFwDevice& device) : m_device(device) { } + ~ScopeGuard() { m_device.set_dma_enabled(false); } + } scope_guard(device); + + // Clear DMA errors, then enable DMA engine. + device.clear_dma_errors(); + device.set_dma_enabled(true); + + // Enable TCP servers. + control_server.start_server(); + acq_server.start_server(); + timetagger_server.start_server(); + + log(LOG_INFO, "Running, press Ctrl-C to stop"); + io.run(); + + // TODO -- multi-threading + // TODO -- get exit status from command handler + + return 0; +} + + +int main(int argc, char **argv) +{ + static const struct option options[] = { + {"help", 0, 0, 'h'}, + {"serialnr", 1, 0, 1 }, + {nullptr, 0, 0, 0} + }; + + const char *usage_text = "Usage: %s --serialnr SNR\n"; + + std::string serial_number = "0"; + + while (1) { + int c = getopt_long(argc, argv, "h", options, nullptr); + if (c == -1) { + break; + } + + switch (c) { + case 'h': + printf(usage_text, argv[0]); + return 0; + case 1: + serial_number = optarg; + break; + default: + fprintf(stderr, usage_text, argv[0]); + return 1; + } + } + + if (optind < argc) { + fprintf(stderr, "ERROR: Unexpected positional argument\n"); + fprintf(stderr, usage_text, argv[0]); + return 1; + } + + try { + PuzzleFwDevice device; + + VersionInfo version = device.get_version_info(); + printf("Detected PuzzleFW firmware version %d.%d\n", + version.major_version, version.minor_version); + printf(" %u analog input channels\n", + device.get_analog_channel_count()); + printf(" DMA buffer size: %zu bytes\n", + device.dma_buffer_size()); + + return run_remote_control_server(device, serial_number); + + } catch (std::exception& e) { + fprintf(stderr, "ERROR: %s\n", e.what()); + return 1; + } +} + +/* end */ diff --git a/sw/src/userspace/version.hpp b/sw/src/userspace/version.hpp new file mode 100644 index 0000000..e855ba4 --- /dev/null +++ b/sw/src/userspace/version.hpp @@ -0,0 +1,2 @@ +#define PUZZLEFW_SW_MAJOR 0 +#define PUZZLEFW_SW_MINOR 1