Index: LeahiRt/CMakeLists.txt =================================================================== diff -u -r6b422c15085ed605d40cb5dd70177e4657a0ca7e -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- LeahiRt/CMakeLists.txt (.../CMakeLists.txt) (revision 6b422c15085ed605d40cb5dd70177e4657a0ca7e) +++ LeahiRt/CMakeLists.txt (.../CMakeLists.txt) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -11,8 +11,8 @@ set(CMAKE_AUTOMOC ON) -find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Core WebSockets) -find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Core WebSockets) +find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Core Network) +find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Core Network) find_package(Comms HINTS ${CMAKE_CURRENT_SOURCE_DIR}/../lib/Comms REQUIRED) find_package(MsgUtils HINTS ${CMAKE_CURRENT_SOURCE_DIR}/../lib/MsgUtils REQUIRED) @@ -38,6 +38,6 @@ target_link_libraries(${PROJECT_NAME} PRIVATE Comms MsgUtils - Qt${QT_VERSION_MAJOR}::WebSockets Qt${QT_VERSION_MAJOR}::Core + Qt${QT_VERSION_MAJOR}::Network ) Index: LeahiRt/LeahiRtController.cpp =================================================================== diff -u -r42bfa9903a12ff03e4051a6c9a9a0d188606dea7 -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- LeahiRt/LeahiRtController.cpp (.../LeahiRtController.cpp) (revision 42bfa9903a12ff03e4051a6c9a9a0d188606dea7) +++ LeahiRt/LeahiRtController.cpp (.../LeahiRtController.cpp) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -1,50 +1,44 @@ -#include -#include // SQ +#include #include #include "LeahiRtController.h" #include "LeahiMsgDefs.h" -LeahiRtController::LeahiRtController(QObject *parent) : +LeahiRtController::LeahiRtController(const QString &configPath, QObject *parent) : QObject(parent), + _settings(configPath, QSettings::IniFormat), _canInterface(), _canThread(this), - _clientSocket(QString(), QWebSocketProtocol::VersionLatest, this), _msgBuilder(this) - // SQ _protoInterface(), - // SQ _protoThread(this) { _canInterface.init(_canThread); connect(&_canInterface, &Can::CanInterface::didFrameReceive, this, &LeahiRtController::onFrameReceive); - - // SQ _protoInterface.init(_protoThread); - // SQ connect(this, &LeahiRtController::didCanMessageReceive, &_protoInterface, &proto::ProtoInterface::onCanMessageReceive); - - connect(&_clientSocket, &QWebSocket::connected, [&]() { qDebug().noquote() << "Socket connected"; }); } LeahiRtController::~LeahiRtController() { - _clientSocket.close(); + _canThread.quit(); + _canThread.wait(); + + _agentThread.quit(); + _agentThread.wait(); } -void LeahiRtController::openSocket(const QString host, const unsigned int port) +void LeahiRtController::connectToAgent() { - _clientSocket.close(); - _clientSocket.open(QUrl(QString("ws://%1:%2").arg(host).arg(port))); + const QString socketPath = _settings.value("Socket/LocalSocketName", "/tmp/leahi_rt.sock").toString(); + const int reconnectIntervalMs = _settings.value("Socket/ReconnectIntervalMs", 5000).toInt(); + _agentInterface.init(socketPath, reconnectIntervalMs, _agentThread); } void LeahiRtController::onFrameReceive(const QCanBusFrame frame) { const Can::CanId canId = Can::CanId(frame.frameId()); Can::Message &msg = _messages[canId]; - // construct the message from the received frame and determine if a complete message has been received if (_msgBuilder.buildMessage(frame.payload(), msg, canId) && msg.isComplete()) { - // SQ Q_EMIT didCanMessageReceive(QDateTime::currentDateTime(), msg); - // if (_clientSocket.isValid()) { - qDebug().noquote() << QString("Received message with MsgId=0x%1").arg(QString("%1").arg(msg.msgId, 4, 16, QChar('0')).toUpper()); - _clientSocket.sendBinaryMessage(leahi::canMessageToProtobufByteArray(QDateTime::currentDateTime(), QStringLiteral("test_device"), msg)); - // } + qDebug().noquote() << QString("Received message with MsgId=0x%1").arg(QString("%1").arg(msg.msgId, 4, 16, QChar('0')).toUpper()); + const QByteArray payload = leahi::canMessageToProtobufByteArray(QDateTime::currentDateTime(), QStringLiteral("test_device"), msg); + _agentInterface.send(AgentMessage::MsgId::ClinicalData, _txSequence++, payload); msg.clear(); } } Index: LeahiRt/LeahiRtController.h =================================================================== diff -u -r6b422c15085ed605d40cb5dd70177e4657a0ca7e -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- LeahiRt/LeahiRtController.h (.../LeahiRtController.h) (revision 6b422c15085ed605d40cb5dd70177e4657a0ca7e) +++ LeahiRt/LeahiRtController.h (.../LeahiRtController.h) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -4,13 +4,15 @@ #include #include #include +#include +#include #include -#include +#include "AgentInterface.h" +#include "AgentMessage.h" #include "CanInterface.h" #include "CanMessage.h" #include "MessageBuilder.h" -// SQ #include "ProtoInterface.h" using namespace Can; @@ -19,19 +21,20 @@ Q_OBJECT public: - explicit LeahiRtController(QObject *parent = nullptr); + explicit LeahiRtController(const QString &configPath, QObject *parent = nullptr); ~LeahiRtController(); - void openSocket(const QString host, const unsigned int port=80); + void connectToAgent(); private: + QSettings _settings; Can::CanInterface _canInterface; QThread _canThread; - QWebSocket _clientSocket; Can::MessageBuilder _msgBuilder; QMap _messages; - // SQ proto::ProtoInterface _protoInterface; - // SQ QThread _protoThread; + AgentInterface _agentInterface; + QThread _agentThread; + quint16 _txSequence = 0; Q_SIGNALS: void didCanMessageReceive(const QDateTime timestamp, const Can::Message msg); Index: LeahiRt/config/LeahiRt.ini =================================================================== diff -u --- LeahiRt/config/LeahiRt.ini (revision 0) +++ LeahiRt/config/LeahiRt.ini (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -0,0 +1,3 @@ +[Socket] +LocalSocketName=/tmp/leahi_rt.sock +ReconnectIntervalMs=5000 Index: LeahiRt/main.cpp =================================================================== diff -u -r6b422c15085ed605d40cb5dd70177e4657a0ca7e -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- LeahiRt/main.cpp (.../main.cpp) (revision 6b422c15085ed605d40cb5dd70177e4657a0ca7e) +++ LeahiRt/main.cpp (.../main.cpp) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -1,7 +1,8 @@ -#include +#include +#include #include #include -#include +#include #include #include "LeahiRtController.h" @@ -33,9 +34,23 @@ QCoreApplication app(argc, argv); app.setApplicationName("LeahiRt"); app.setApplicationVersion("1.0"); + + QCommandLineParser parser; + parser.setApplicationDescription("Leahi Real-time Cloud Data Transmission daemon."); + parser.addHelpOption(); + parser.addVersionOption(); - LeahiRtController rtController; - rtController.openSocket("localhost", 80); + QCommandLineOption configOption( + {"c", "config"}, + "Path to the configuration INI file.", + "config", + QDir(app.applicationDirPath()).filePath("config/LeahiRt.ini") + ); + parser.addOption(configOption); + parser.process(app); + LeahiRtController rtController(parser.value(configOption)); + rtController.connectToServer(); + return app.exec(); } Index: lib/Comms/CMakeLists.txt =================================================================== diff -u -r8df05d39e7947d508cb49b6a77f2afd7964232ba -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- lib/Comms/CMakeLists.txt (.../CMakeLists.txt) (revision 8df05d39e7947d508cb49b6a77f2afd7964232ba) +++ lib/Comms/CMakeLists.txt (.../CMakeLists.txt) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -11,13 +11,13 @@ find_package(MsgUtils HINTS ${CMAKE_CURRENT_SOURCE_DIR}/../MsgUtils REQUIRED) set(INCLUDES - include/AgentMessage.h + include/AgentInterface.h include/CanInterface.h include/ProtoInterface.h ) set(SRCS - src/AgentMessage.cpp + src/AgentInterface.cpp src/CanInterface.cpp src/ProtoInterface.cpp ) Index: lib/Comms/include/AgentInterface.h =================================================================== diff -u --- lib/Comms/include/AgentInterface.h (revision 0) +++ lib/Comms/include/AgentInterface.h (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -0,0 +1,124 @@ +/*! + * + * Copyright (c) 2024-2026 Diality Inc. - All Rights Reserved. + * \copyright + * THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN + * WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. + * + * \file AgentInterface.h + * \author (original) Stephen Quong + * \date (original) 24-May-2026 + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "AgentMessage.h" + +/*! + * \brief UDS interface to the Connectivity Agent. + * \details Manages the QLocalSocket connection to the Connectivity Agent, + * including automatic reconnection on disconnect or error. + * + * \b Outbound: call send() to build and write an AgentMessage frame + * to the socket. Returns false if the socket is not connected. + * + * \b Inbound: raw bytes received from the socket are fed through + * an AgentMessage parser. When a complete frame is assembled the + * didMessageReceive() signal is emitted with the decoded fields. + * + * Call init() with the socket path, reconnect interval, and an optional + * QThread to run the interface on a dedicated thread. The class handles + * all reconnect attempts internally. + */ +class AgentInterface : public QObject +{ + Q_OBJECT + +public: + /*! + * \brief Construct an AgentInterface. + * \param parent Optional QObject parent. + */ + explicit AgentInterface(QObject *parent = nullptr); + + /*! + * \brief Initialize and connect to the Connectivity Agent UDS socket. + * \details Stores the socket path and reconnect interval, then initiates + * the first connection attempt. Reconnection is handled automatically + * on disconnect or error. + * \param socketPath Path to the Unix domain socket. + * \param reconnectIntervalMs Interval in milliseconds between reconnect attempts. + */ + bool init(const QString &socketPath, int reconnectIntervalMs); + + /*! + * \brief Initialize on a dedicated thread and connect to the Connectivity Agent UDS socket. + * \details Calls init() then moves this object onto \p thread. + * Must be called from the main thread. + * \param socketPath Path to the Unix domain socket. + * \param reconnectIntervalMs Interval in milliseconds between reconnect attempts. + * \param thread The thread to move this object onto. + */ + bool init(const QString &socketPath, int reconnectIntervalMs, QThread &thread); + + /*! + * \brief Send a message to the Connectivity Agent. + * \details Builds a wire-ready AgentMessage frame and writes it to the socket. + * \param msgId Message identifier for Agent MQTT routing. + * \param sequence Caller-managed sequence number. + * \param payload Message payload. Pass an empty QByteArray for zero-length frames. + * \return true if the frame was written to the socket, false if not connected. + */ + bool send(AgentMessage::MsgId msgId, quint16 sequence, const QByteArray &payload = {}); + +public Q_SLOTS: + /*! + * \brief Quit the interface and move back to the main thread for destruction. + */ + void quit(); + +Q_SIGNALS: + /*! + * \brief Emitted when a complete inbound AgentMessage frame has been parsed. + * \param msgId Message identifier from the frame header. + * \param sequence Sequence number from the frame header. + * \param payload Decoded payload bytes, empty for zero-length frames. + */ + void didMessageReceive(AgentMessage::MsgId msgId, quint16 sequence, QByteArray payload); + + /*! + * \brief Emitted when the socket connects successfully. + */ + void didConnect(); + + /*! + * \brief Emitted when the socket disconnects. + */ + void didDisconnect(); + +private Q_SLOTS: + void onConnected(); + void onDisconnected(); + void onError(QLocalSocket::LocalSocketError error); + void onReadyRead(); + void onReconnectTimer(); + +private: + void connectToServer(); + void initThread(QThread &thread); + void quitThread(); + + QLocalSocket _socket; + QTimer _reconnectTimer; + QString _socketPath; + QByteArray _rxBuf; + AgentMessage _rxMsg; + bool _init = false; +}; Fisheye: Tag f8c0a0b1c19dc386b2f88484d53db022270690a7 refers to a dead (removed) revision in file `lib/Comms/include/AgentMessage.h'. Fisheye: No comparison available. Pass `N' to diff? Index: lib/Comms/src/AgentInterface.cpp =================================================================== diff -u --- lib/Comms/src/AgentInterface.cpp (revision 0) +++ lib/Comms/src/AgentInterface.cpp (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -0,0 +1,203 @@ +/*! + * + * Copyright (c) 2024-2026 Diality Inc. - All Rights Reserved. + * \copyright + * THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN + * WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. + * + * \file AgentInterface.cpp + * \author (original) Stephen Quong + * \date (original) 24-May-2026 + * + */ +#include "AgentInterface.h" + +#include +#include +#include + +static void qRegister() { qRegisterMetaType("AgentMessage::MsgId"); } +Q_COREAPP_STARTUP_FUNCTION(qRegister) + +/*! + * \brief AgentInterface::AgentInterface + * \details Constructor. Wires all socket and timer signal/slot connections. + * \param parent Optional QObject parent. + */ +AgentInterface::AgentInterface(QObject *parent) : QObject(parent) +{ + connect(&_socket, &QLocalSocket::connected, this, &AgentInterface::onConnected); + connect(&_socket, &QLocalSocket::disconnected, this, &AgentInterface::onDisconnected); + connect(&_socket, &QLocalSocket::errorOccurred, this, &AgentInterface::onError); + connect(&_socket, &QLocalSocket::readyRead, this, &AgentInterface::onReadyRead); + + _reconnectTimer.setSingleShot(false); + connect(&_reconnectTimer, &QTimer::timeout, this, &AgentInterface::onReconnectTimer); +} + +/*! + * \brief AgentInterface::init + * \details Stores the socket path and reconnect interval, then initiates the first + * connection attempt. Guards against double-initialisation. + * \param socketPath Path to the Unix domain socket. + * \param reconnectIntervalMs Interval in milliseconds between reconnect attempts. + * \return true on success, false if already initialised. + */ +bool AgentInterface::init(const QString &socketPath, int reconnectIntervalMs) +{ + if (_init) { + return false; + } + _init = true; + + _socketPath = socketPath; + _reconnectTimer.setInterval(reconnectIntervalMs); + connectToServer(); + return true; +} + +/*! + * \brief AgentInterface::init + * \details Calls init() then moves this object onto \p thread. Must be called + * from the main thread. + * \param socketPath Path to the Unix domain socket. + * \param reconnectIntervalMs Interval in milliseconds between reconnect attempts. + * \param thread The thread to move this object onto. + * \return true on success, false if already initialised. + */ +bool AgentInterface::init(const QString &socketPath, int reconnectIntervalMs, QThread &thread) +{ + if (!init(socketPath, reconnectIntervalMs)) { + return false; + } + initThread(thread); + return true; +} + +/*! + * \brief AgentInterface::send + * \details Builds a wire-ready AgentMessage frame and writes it to the socket. + * \param msgId Message identifier for Agent MQTT routing. + * \param sequence Caller-managed sequence number. + * \param payload Message payload. Pass an empty QByteArray for zero-length frames. + * \return true if the frame was written to the socket, false if not connected. + */ +bool AgentInterface::send(AgentMessage::MsgId msgId, quint16 sequence, const QByteArray &payload) +{ + if (_socket.state() != QLocalSocket::ConnectedState) { + return false; + } + const QByteArray frame = AgentMessage::build(msgId, sequence, payload); + _socket.write(frame); + _socket.flush(); + return true; +} + +/*! + * \brief AgentInterface::quit + * \details Moves this object back to the main thread for safe destruction. + */ +void AgentInterface::quit() { quitThread(); } + +/*! + * \brief AgentInterface::connectToServer + * \details Initiates a connection to the stored socket path. + */ +void AgentInterface::connectToServer() { _socket.connectToServer(_socketPath, QLocalSocket::ReadWrite); } + +/*! + * \brief AgentInterface::initThread + * \details Names the thread, connects aboutToQuit to quit(), moves this object + * onto the thread, and starts it. Must be called from the main thread. + * \param thread The thread to move this object onto. + */ +void AgentInterface::initThread(QThread &thread) +{ + Q_ASSERT_X(QThread::currentThread() == qApp->thread(), __func__, + "AgentInterface::init must be called from the main thread"); + thread.setObjectName(QString("%1_Thread").arg(metaObject()->className())); + connect(qApp, &QCoreApplication::aboutToQuit, this, &AgentInterface::quit); + moveToThread(&thread); + thread.start(); +} + +/*! + * \brief AgentInterface::quitThread + * \details Moves this object back to the main thread so it can be safely + * destroyed by its owner. + */ +void AgentInterface::quitThread() +{ + if (QThread::currentThread() != qApp->thread()) { + moveToThread(qApp->thread()); + } +} + +/*! + * \brief AgentInterface::onConnected + * \details Stops the reconnect timer and emits didConnect(). + */ +void AgentInterface::onConnected() +{ + qDebug().noquote() << "Agent socket connected to" << _socketPath; + _reconnectTimer.stop(); + emit didConnect(); +} + +/*! + * \brief AgentInterface::onDisconnected + * \details Clears inbound parser state, starts the reconnect timer, and emits didDisconnect(). + */ +void AgentInterface::onDisconnected() +{ + qDebug().noquote() << "Agent socket disconnected — retrying in" << _reconnectTimer.interval() / 1000 << "s"; + _rxBuf.clear(); + _rxMsg.reset(); + _reconnectTimer.start(); + emit didDisconnect(); +} + +/*! + * \brief AgentInterface::onError + * \details Logs the socket error and starts the reconnect timer if not already running. + * \param error The socket error code. + */ +void AgentInterface::onError(QLocalSocket::LocalSocketError error) +{ + qDebug().noquote() << "Agent socket error:" << _socket.errorString() << QString("(%1)").arg(error); + if (!_reconnectTimer.isActive()) { + _reconnectTimer.start(); + } +} + +/*! + * \brief AgentInterface::onReadyRead + * \details Appends incoming bytes to the receive buffer and drains it through + * the AgentMessage parser. Emits didMessageReceive() for each complete frame. + */ +void AgentInterface::onReadyRead() +{ + _rxBuf.append(_socket.readAll()); + + AgentMessage::FeedResult result; + do { + result = _rxMsg.feed(_rxBuf); + if (result == AgentMessage::FeedResult::Complete) { + emit didMessageReceive(_rxMsg.msgId(), _rxMsg.sequence(), _rxMsg.payload()); + _rxMsg.reset(); + } + } while (result == AgentMessage::FeedResult::Complete && !_rxBuf.isEmpty()); +} + +/*! + * \brief AgentInterface::onReconnectTimer + * \details Attempts to reconnect if the socket is in the unconnected state. + */ +void AgentInterface::onReconnectTimer() +{ + if (_socket.state() != QLocalSocket::UnconnectedState) { + return; + } + qDebug().noquote() << "Agent socket reconnecting to" << _socketPath; + connectToServer(); +} Fisheye: Tag f8c0a0b1c19dc386b2f88484d53db022270690a7 refers to a dead (removed) revision in file `lib/Comms/src/AgentMessage.cpp'. Fisheye: No comparison available. Pass `N' to diff? Index: lib/MsgUtils/CMakeLists.txt =================================================================== diff -u -r42bfa9903a12ff03e4051a6c9a9a0d188606dea7 -rf8c0a0b1c19dc386b2f88484d53db022270690a7 --- lib/MsgUtils/CMakeLists.txt (.../CMakeLists.txt) (revision 42bfa9903a12ff03e4051a6c9a9a0d188606dea7) +++ lib/MsgUtils/CMakeLists.txt (.../CMakeLists.txt) (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -34,6 +34,7 @@ set(LEAHI_MSG_CONF ${CMAKE_CURRENT_SOURCE_DIR}/../../data/LeahiUnhandled.conf) set(INCLUDES + include/AgentMessage.h include/CanMessage.h include/crc.h # include/DenaliLogUtils.h @@ -50,6 +51,7 @@ ) set(SRCS + src/AgentMessage.cpp src/crc.cpp # src/DenaliLogUtils.cpp # src/encryption.cpp Index: lib/MsgUtils/include/AgentMessage.h =================================================================== diff -u --- lib/MsgUtils/include/AgentMessage.h (revision 0) +++ lib/MsgUtils/include/AgentMessage.h (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -0,0 +1,189 @@ +/*! + * + * Copyright (c) 2024-2026 Diality Inc. - All Rights Reserved. + * \copyright + * THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN + * WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. + * + * \file AgentMessage.h + * \author (original) Stephen Quong + * \date (original) 24-May-2026 + * + */ +#pragma once + +#include + +/*! + * \brief LeahiRt to Connectivity Agent message framing. + * \details Implements the binary message framing protocol + * between the LeahiRt application and the Agent application. + * + * The class is transport-agnostic — it has no dependency on any + * socket or I/O class and can be used with any byte-stream transport + * (QLocalSocket, QTcpSocket, CAN bus, test harness, etc.). + * + * \b Outbound: AgentMessage::build() constructs a complete wire-ready + * frame from a message ID, sequence number, and optional payload. + * + * \b Inbound: feed() accepts raw bytes from the transport. The caller + * invokes feed() each time bytes arrive and inspects the returned + * FeedResult. When FeedResult::Complete is returned the decoded + * message is available via msgId(), sequence(), and payload(). + * reset() must be called before feeding the next message. + * + * \b Frame \b layout \b — \b header \b only \b (payload_length = 0): + * \code + * Byte: 0 1 2 3 4 5 6-9 10 11 + * ┌─────────┬────────┬────────┬───────────┬────────┐ + * │ AA 55 │ msg_id │ seq_num│ pay_length│hdr_crc │ + * │ sync │uint16BE│uint16BE│ uint32 BE │uint16BE│ + * └─────────┴────────┴────────┴───────────┴────────┘ + * \endcode + * + * \b Frame \b layout \b — \b with \b payload \b (payload_length > 0): + * \code + * ┌── 12-byte header ──┬── N bytes payload ──┬── pay_crc (4 B) ──┐ + * │ (see above) │ uint8[] │ CRC-32/ISO-HDLC │ + * └────────────────────┴─────────────────────┴───────────────────┘ + * \endcode + * + * Header CRC uses CRC-16/CCITT (poly 0x1021, init 0xFFFF, no reflection). + * Payload CRC uses CRC-32/ISO-HDLC (IEEE 802.3, reflected poly 0xEDB88320). + */ +class AgentMessage +{ +public: + /*! + * \brief MQTT routing identifier carried in every frame header. + * \details The Connectivity Agent uses this value as a key into its + * routing table to determine the MQTT topic for the message. + */ + enum class MsgId : quint16 { + ClinicalData = 0x0001, ///< Clinical data → MQTT topic: clinical + Diagnostic = 0x0002, ///< Diagnostic data → MQTT topic: service + Ack = 0x0003, ///< Acknowledgement → response to LeahiRt + Alarms = 0x0004, ///< Alarm data → MQTT topic: alarms + Audit = 0x0005, ///< Audit data → MQTT topic: audit + DeviceLogFile = 0x0006, ///< Device log file → MQTT topic: log + TreatmentLogFile = 0x0007, ///< Treatment log → MQTT topic: tx_log + CloudSyncLogFile = 0x0008, ///< CloudSync log → MQTT topic: cs_log + }; + + /*! + * \brief Result returned by feed() after processing each byte chunk. + */ + enum class FeedResult { + Incomplete, ///< More bytes needed — continue feeding. + Complete, ///< Full valid frame assembled — read accessors, then call reset(). + HeaderError, ///< Header CRC mismatch — frame dropped, state reset automatically. + PayloadError, ///< Payload CRC mismatch or oversized payload — frame dropped, state reset automatically. + }; + + /*! + * \brief Build a complete wire-ready frame. + * \details Constructs the 12-byte header, computes the CRC-16/CCITT header + * integrity field, and when payload is non-empty appends the payload + * followed by its CRC-32/ISO-HDLC integrity field. + * \param msgId Message identifier used by the Agent for MQTT routing. + * \param sequence Caller-managed sequence number for message tracking and ACK correlation. + * \param payload Optional message payload. Pass an empty QByteArray for zero-length + * control frames (e.g. ACK). Payload must not exceed 64 KB. + * \return QByteArray containing the complete frame ready to write to the transport. + */ + static QByteArray build(MsgId msgId, quint16 sequence, const QByteArray &payload = {}); + + /*! + * \brief Feed raw bytes from the transport into the inbound state machine. + * \details Consumes only the bytes needed to advance or complete the current + * frame, then returns. Any bytes beyond what the current frame + * requires are left in \p bytes so the caller can pass the same + * buffer back to feed() to begin parsing the next frame. + * + * The caller owns the buffer and is responsible for draining it: + * \code + * QByteArray buf = _socket->readAll(); + * AgentMessage::FeedResult result; + * do { + * result = _msg.feed(buf); + * if (result == AgentMessage::FeedResult::Complete) { + * handleMessage(_msg.msgId(), _msg.sequence(), _msg.payload()); + * _msg.reset(); + * } + * } while (result == AgentMessage::FeedResult::Complete && !buf.isEmpty()); + * \endcode + * + * \param bytes Buffer of raw bytes from the transport. Bytes consumed by + * this call are removed from the front of the buffer. + * \return FeedResult indicating the parser outcome for this call. + */ + FeedResult feed(QByteArray &bytes); + + /*! + * \brief Message identifier of the last successfully parsed frame. + * \note Valid only after feed() returns FeedResult::Complete. + */ + MsgId msgId() const { return _rxMsgId; } + + /*! + * \brief Sequence number of the last successfully parsed frame. + * \note Valid only after feed() returns FeedResult::Complete. + */ + quint16 sequence() const { return _rxSequence; } + + /*! + * \brief Payload of the last successfully parsed frame. + * \details Returns an empty QByteArray for zero-length frames. + * \note Valid only after feed() returns FeedResult::Complete. + */ + QByteArray payload() const { return _rxPayload; } + + /*! + * \brief Reset inbound parser state. + * \details Clears all accumulated header bytes and decoded fields, returning + * the parser to its initial sync-scanning state. Must be called by + * the caller after consuming a Complete frame. Error results call + * reset() internally. The caller's buffer is unaffected — bytes + * already removed from it by feed() are not restored. + */ + void reset(); + + + private: + /*! + * \brief Compute a CRC-16/CCITT integrity value. + * \details Parameters: poly 0x1021, init 0xFFFF, no input or output + * reflection, no final XOR. Used for header integrity. + * Verify implementation with check value: crc16ccitt("123456789", 9) == 0x29B1. + * \param data Pointer to the input data bytes. + * \param len Number of bytes to process. + * \return 16-bit CRC value. + */ + static quint16 crc16ccitt(const quint8 *data, int len); + + /*! + * \brief Compute a CRC-32/ISO-HDLC integrity value. + * \details Parameters: reflected poly 0xEDB88320 (IEEE 802.3), init 0xFFFFFFFF, + * input and output reflected, final XOR 0xFFFFFFFF. Used for payload integrity. + * Verify implementation with check value: crc32isohdlc("123456789", 9) == 0xCBF43926. + * \param data Pointer to the input data bytes. + * \param len Number of bytes to process. + * \return 32-bit CRC value. + */ + static quint32 crc32isohdlc(const quint8 *data, int len); + + static constexpr int SYNC_SIZE = 2; ///< Sync word length in bytes. + static constexpr quint8 SYNC[SYNC_SIZE] = {0xAA, 0x55}; ///< Sync word byte sequence. + static constexpr int HEADER_SIZE = 12; ///< Fixed header length in bytes. + static constexpr int MSGID_SIZE = 2; ///< msg_id field length in bytes. + static constexpr int SEQUENCE_SIZE = 2; ///< sequence_num field length in bytes. + static constexpr int HEADER_CRC_SIZE = 2; ///< Header CRC field length in bytes. + static constexpr int PAYLOAD_CRC_SIZE = 4; ///< Payload CRC field length in bytes. + static constexpr quint32 MAX_PAYLOAD_LEN = 64 * 1024; ///< Maximum permitted payload length. + + QByteArray _headerBuf; ///< Partial header bytes accumulated across feed() calls. + MsgId _rxMsgId = MsgId::ClinicalData; ///< Parsed msg_id, valid after header is complete. + quint16 _rxSequence = 0; ///< Parsed sequence number, valid after header is complete. + quint32 _rxPayloadLen = 0; ///< Parsed payload_length, valid after header is complete. + QByteArray _rxPayload; ///< Decoded payload of the last complete frame. +}; Index: lib/MsgUtils/src/AgentMessage.cpp =================================================================== diff -u --- lib/MsgUtils/src/AgentMessage.cpp (revision 0) +++ lib/MsgUtils/src/AgentMessage.cpp (revision f8c0a0b1c19dc386b2f88484d53db022270690a7) @@ -0,0 +1,175 @@ +#include "AgentMessage.h" + +#include + +// --------------------------------------------------------------------------- +// Outbound +// --------------------------------------------------------------------------- +QByteArray AgentMessage::build(MsgId msgId, quint16 sequence, const QByteArray &payload) +{ + const quint32 payloadLen = static_cast(payload.size()); + + // Header: sync(2) + msg_id(2) + sequence(2) + payload_length(4) + header_crc(2) = 12 bytes + QByteArray msg(HEADER_SIZE, Qt::Uninitialized); + quint8 *header = reinterpret_cast(msg.data()); + + header[0] = SYNC[0]; + header[1] = SYNC[1]; + qToBigEndian(static_cast(msgId), header + SYNC_SIZE); + qToBigEndian(sequence, header + SYNC_SIZE + MSGID_SIZE); + qToBigEndian(payloadLen, header + SYNC_SIZE + MSGID_SIZE + SEQUENCE_SIZE); + + const quint16 hCrc = crc16ccitt(header, HEADER_SIZE - HEADER_CRC_SIZE); + qToBigEndian(hCrc, header + HEADER_SIZE - HEADER_CRC_SIZE); + + if (payloadLen == 0) { + return msg; + } + + msg.append(payload); + + const quint32 crc = crc32isohdlc(reinterpret_cast(payload.constData()), payload.size()); + QByteArray crcBytes(PAYLOAD_CRC_SIZE, Qt::Uninitialized); + qToBigEndian(crc, reinterpret_cast(crcBytes.data())); + msg.append(crcBytes); + + return msg; +} + +// --------------------------------------------------------------------------- +// Inbound +// \details 1. Bytes are consumed when: scanning for the sync word, a complete header is parsed, +// or a full frame (header + payload) is parsed. +// 2. Header and payload will try to be read in the same feed() call when possible, but they +// may also be parsed across multiple feed() calls if the input buffer is fragmented. +// The caller is responsible for accumulating incoming bytes into the buffer and invoking feed() +// each time new bytes arrive. +// 3. If a header fails CRC validation, only the sync word is discarded. The payload section is +// skipped and the remaining bytes stay in the buffer to be re-scanned on the next feed() call. +// 4. If a payload fails CRC validation, only the header is discarded (pos is not advanced past +// the payload). The payload bytes stay in the buffer to be re-scanned on the next feed() call. +// 5. bytes parameter is modified in-place to remove the consumed bytes. The caller is responsible for +// appending new incoming bytes to the buffer and invoking feed() again. +// 6. If PayloadError is returned, the caller can immediately call feed() again in order to try +// to search for a header in the remaining payload bytes. +// If HeaderError is returned, the caller can call feed() again if there is any data remaining in the +// buffer to search for a header in the remaining bytes. +// --------------------------------------------------------------------------- +AgentMessage::FeedResult AgentMessage::feed(QByteArray &bytes) +{ + int pos = 0; + FeedResult result = FeedResult::Incomplete; + + // scan for a valid header — skipped when _headerBuf is already populated from a prior feed() call + while (_headerBuf.size() == 0 && bytes.size() - pos >= HEADER_SIZE && result != FeedResult::HeaderError) { + if (static_cast(bytes.at(pos)) == SYNC[0] && static_cast(bytes.at(pos + 1)) == SYNC[1]) { + _headerBuf.append(bytes.constData() + pos, HEADER_SIZE); + if (crc16ccitt(reinterpret_cast(_headerBuf.constData()), HEADER_SIZE - HEADER_CRC_SIZE) == + qFromBigEndian(reinterpret_cast(_headerBuf.constData() + HEADER_SIZE - + HEADER_CRC_SIZE))) + { + const quint8 *header = reinterpret_cast(_headerBuf.constData()); + int header_pos = SYNC_SIZE; + _rxMsgId = static_cast(qFromBigEndian(header + header_pos)); + header_pos += MSGID_SIZE; + _rxSequence = qFromBigEndian(header + header_pos); + header_pos += SEQUENCE_SIZE; + _rxPayloadLen = qFromBigEndian(header + header_pos); + pos += HEADER_SIZE; + } + else { + // TODO: log the header CRC failure + _headerBuf.clear(); + pos += SYNC_SIZE; + result = FeedResult::HeaderError; + } + } + else { + ++pos; + } + } + + // process payload if a valid header has been accumulated + if (result != FeedResult::HeaderError && _headerBuf.size() == HEADER_SIZE) { + if (_rxPayloadLen == 0) { + result = FeedResult::Complete; + } + else if (_rxPayloadLen > MAX_PAYLOAD_LEN) { + // TODO: log the oversized payload + _headerBuf.clear(); + result = FeedResult::PayloadError; + } + else if (bytes.size() - pos >= static_cast(_rxPayloadLen) + PAYLOAD_CRC_SIZE) { + const quint8 *payload = reinterpret_cast(bytes.constData() + pos); + if (crc32isohdlc(payload, static_cast(_rxPayloadLen)) == + qFromBigEndian(payload + _rxPayloadLen)) + { + _rxPayload = QByteArray(reinterpret_cast(payload), static_cast(_rxPayloadLen)); + pos += static_cast(_rxPayloadLen) + PAYLOAD_CRC_SIZE; + result = FeedResult::Complete; + } + else { + // TODO: log the payload CRC failure + _headerBuf.clear(); + result = FeedResult::PayloadError; + } + } + } + + // remove the consumed bytes from the input buffer + bytes.remove(0, pos); + + return result; +} + +void AgentMessage::reset() +{ + _headerBuf.clear(); + _rxMsgId = MsgId::ClinicalData; + _rxSequence = 0; + _rxPayloadLen = 0; + _rxPayload.clear(); +} + +// --------------------------------------------------------------------------- +// CRC-16/CCITT (poly 0x1021, init 0xFFFF, no reflection, no final XOR) +// Check value for "123456789": 0x29B1 +// --------------------------------------------------------------------------- +quint16 AgentMessage::crc16ccitt(const quint8 *data, int len) +{ + quint16 crc = 0xFFFF; + for (int i = 0; i < len; ++i) { + crc ^= static_cast(data[i]) << 8; + for (int bit = 0; bit < 8; ++bit) { + if (crc & 0x8000) { + crc = (crc << 1) ^ 0x1021; + } + else { + crc <<= 1; + } + } + } + return crc; +} + +// --------------------------------------------------------------------------- +// CRC-32/ISO-HDLC (IEEE 802.3, reflected poly 0xEDB88320, +// init 0xFFFFFFFF, input/output reflected, final XOR 0xFFFFFFFF) +// Check value for "123456789": 0xCBF43926 +// --------------------------------------------------------------------------- +quint32 AgentMessage::crc32isohdlc(const quint8 *data, int len) +{ + quint32 crc = 0xFFFFFFFF; + for (int i = 0; i < len; ++i) { + crc ^= data[i]; + for (int bit = 0; bit < 8; ++bit) { + if (crc & 1) { + crc = (crc >> 1) ^ 0xEDB88320; + } + else { + crc >>= 1; + } + } + } + return crc ^ 0xFFFFFFFF; +}