Index: cloud_sync.py =================================================================== diff -u --- cloud_sync.py (revision 0) +++ cloud_sync.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,223 @@ +import argparse +import logging +import socket +import sys + +from flask import Flask, Response, request +from flask_restplus import Api, Resource, fields, reqparse + +from cloudsync.utils.helpers import * +from cloudsync.utils.globals import * +from cloudsync.utils.reachability import ReachabilityProvider +from cloudsync.busses.file_output_bus import FileOutputBus +from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler +from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler +from cloudsync.busses.file_input_bus import FileInputBus +from cloudsync.utils.heartbeat import HeartBeatProvider + +VERSION = "0.2.3" + +arguments = sys.argv +log_level = int(arguments[1]) + + +logging.basicConfig(filename='cloudsync.log', level=log_level) + +app = Flask(__name__) +api = Api(app=app, version=VERSION, title="CloudSync Registration API", + description="Interface with DIA Manufacturing Tool") + +for handler in app.logger.handlers: + app.logger.removeHandler(handler) +handler = logging.FileHandler('cloudsync.log') +handler.setLevel(log_level) +default_formatter = logging.Formatter('[%(asctime)s] %(levelname)s in %(module)s: %(message)s') +handler.setFormatter(default_formatter) + +app.logger.addHandler(handler) +app.logger.setLevel(log_level) + +g_utils.add_logger(app.logger) + +g_config = helpers_read_config(CONFIG_PATH) + +reachability_provider = ReachabilityProvider(logger=app.logger) + +g_utils.add_reachability_provider(reachability_provider=reachability_provider) + +output_channel = FileOutputBus(logger=app.logger, max_size=1, file_channels_path=UI2CS_FILE_CHANNELS_PATH) + +network_request_handler = NetworkRequestHandler(logger=app.logger, max_size=1, output_channel=output_channel, + reachability_provider=reachability_provider) +message_handler = UICSMessageHandler(logger=app.logger, max_size=20, network_request_handler=network_request_handler, + output_channel=output_channel, reachability_provider=reachability_provider) +ui_cs_bus = FileInputBus(logger=app.logger, file_channels_path=UI2CS_FILE_CHANNELS_PATH, input_channel_name="inp.buf", + g_config=g_config, message_handler=message_handler) + +heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, + output_channel=output_channel) + +if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + heartbeat_provider.send_heartbeat = True +else: + heartbeat_provider.send_heartbeat = False + +parser = reqparse.RequestParser() + +# START - REGISTRATION ENDPOINTS + +@api.route("/version") +class Version(Resource): + + @api.response(200, "Success") + def get(self): + app.logger.info("Received /version HTTP request") + return { + "version": VERSION + }, 200 + + +@api.route("/credentials") +class Credentials(Resource): + + FIELD_CREDENTIALS = api.model("Credentials", { + "certificate": fields.String, + "public_key": fields.String, + "private_key": fields.String, + }) + + @api.doc(body=FIELD_CREDENTIALS, validate=True) + @api.response(200, "Success") + @api.response(400, "Validation Error") + def put(self): + """ + 4. Update the device's certificate and keys + Manufacturing Tool -> Device + """ + app.logger.info("Received /credentials HTTP request") + payload = request.json + certificate = payload.get("certificate", None) + private_key = payload.get("private_key", None) + public_key = payload.get("public_key", None) + invalid_params = [] + if certificate is None: + invalid_params.append("certificate") + if private_key is None: + invalid_params.append("private_key") + if public_key is None: + invalid_params.append("public_key") + if len(invalid_params) > 0: + return {"invalidAttributes": invalid_params}, 400 + + r = NetworkRequest(request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, + url=request.url, + payload=payload, + method=request.method, + g_config=g_config, + ) + network_request_handler.enqueue_request(r) + + return { + "invalidAttributes": [], + }, 200 + + +@api.route("/validate") +class Validate(Resource): + + @api.response(200, "Success") + def head(self): + """ + 5. Initiate device connectivity test + Manufacturing Tool -> Device + """ + app.logger.info("Received /validate HTTP request") + + r = NetworkRequest(request_type=NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST, + url=request.url, + payload={}, + method=request.method, + g_config=g_config, + ) + network_request_handler.enqueue_request(r) + + # no body is returned + return {}, 200 + + +@api.route("/reset") +class Reset(Resource): + + @api.response(200, "Success") + def head(self): + """ + 13. Initiate a factory reset + Assumes the connectivity test has already been completed + + Manufacturing Tool -> Device + """ + app.logger.info("Received /reset HTTP request") + + r = NetworkRequest(request_type=NetworkRequestType.MFT2CS_REQ_FACTORY_RESET, + url=request.url, + payload={}, + method=request.method, + g_config=g_config, + ) + network_request_handler.enqueue_request(r) + + heartbeat_provider.send_heartbeat = True + + return {}, 200 + +# END - REGISTRATION ENDPOINTS + +@api.route("/config") +class Config(Resource): + + @api.response(200, "Success") + def get(self): + """ + Get the current json config + """ + app.logger.info("Received /config HTTP request") + return g_config, 200 + + +@api.route("/token") +class Config(Resource): + + @api.response(200, "Success") + def get(self): + """ + Get the currently stored token + """ + app.logger.info("Received /token HTTP request") + return helpers_read_access_token(DEVICE_KEBORMED_ACCESS_TOKEN_PATH), 200 + + +@api.route("/logs") +class Logs(Resource): + + @api.hide + def get(self): + app.logger.info("Received /logs HTTP request") + content = "" + with open(CS_LOG_FILE, 'r') as f: + content = f.read() + return Response(content, mimetype="text/plain") + + +def main(): + if DEBUG: + app.run(debug=False, use_reloader=False, host="0.0.0.0", port=g_config[CONFIG_DEVICE][CONFIG_DEVICE_PORT]) + elif VM: + app.logger.error("VM mode not implemented yet.") + elif SERVER: + app.logger.error("Server mode not implemented yet.") + else: + app.logger.error("Production mode not implemented yet.") + + +if __name__ == '__main__': + main() Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u --- cloudsync/busses/file_input_bus.py (revision 0) +++ cloudsync/busses/file_input_bus.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,59 @@ +from logging import Logger +from threading import Event, Thread +from time import time, sleep +import inotify.adapters + +from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler +from cloudsync.handlers.uics_message import UICSMessage + + +class FileInputBus: + def __init__(self, logger: Logger, file_channels_path: str, input_channel_name: str, g_config: dict, message_handler: UICSMessageHandler): + + self.last_input_message_id = 0 + + self.logger = logger + self.file_channels_path = file_channels_path + self.input_channel_name = input_channel_name + self.g_config = g_config + self.message_handler = message_handler + + self.i = inotify.adapters.Inotify() + self.i.add_watch(self.file_channels_path) + + self.thread = Thread(target=self.input_channel_handler, daemon=True) + self.event = Event() + self.thread.start() + + def input_channel_handler(self): + for event in self.i.event_gen(yield_nones=False): + (_, type_names, path, filename) = event + # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) + if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( + filename.endswith(self.input_channel_name)): + + try: + f = open(self.file_channels_path + "/" + filename) + except IOError as er: + self.logger.error('Opening input file error: {0}'.format(' '.join(er.args))) + + new_input_messages = [] + + for line in f.readlines(): + message_parameters = line.strip().split(',') + print("message parameters: {0}".format(message_parameters)) + if int(message_parameters[1]) > self.last_input_message_id: + new_message = UICSMessage(line.strip(), self.g_config) + message_added_to_queue = self.message_handler.enqueue_message(new_message) + if message_added_to_queue: + new_input_messages.append((int(message_parameters[1]), line.strip())) + + new_input_messages.sort(key=lambda x: x[0]) + # self.logger.debug("New Input messages added to queue: {0}".format(new_input_messages)) + + if len(new_input_messages) > 0: + last_new_message_id = new_input_messages[len(new_input_messages) - 1][0] + self.last_input_message_id = last_new_message_id + else: + last_new_message_id = self.last_input_message_id + Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u --- cloudsync/busses/file_output_bus.py (revision 0) +++ cloudsync/busses/file_output_bus.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,60 @@ +from logging import Logger +from collections import deque +from threading import Event, Thread +import datetime + + +class FileOutputBus: + + def __init__(self, logger: Logger, max_size, file_channels_path: str): + + self.last_output_message_id = 1 + + self.logger = logger + self.file_channels_path = file_channels_path + self.queue = deque(maxlen=max_size) + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new messages + :return: None + """ + while True: + flag = self.event.wait() + if flag: + while len(self.queue) > 0: + # print('queue size: {0}'.format(len(self.queue))) + message_body = self.queue.popleft() + self.handle_message(message_body) + self.event.clear() + + def enqueue_message(self, message_body: str) -> bool: + """ + :param message_body: the data to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(message_body) + self.event.set() + return True + else: + return False + + def handle_message(self, message_body: str): + print('Message body: {0}'.format(message_body)) + self.logger.debug('Message body: {0}'.format(message_body)) + + try: + filename = datetime.datetime.utcnow().date().strftime("%Y_%m_%d_out.buf") + message_body = str(round(datetime.datetime.now().timestamp())) + ',' + str(self.last_output_message_id) + ',0,' + message_body + self.logger.debug('Full message: {0}'.format(message_body)) + print('Full message: {0}'.format(message_body)) + f = open(self.file_channels_path + "/" + filename, "a") + f.write("{0}\n".format(message_body)) + f.close() + self.last_output_message_id += 1 + except IOError as er: + self.logger.error('Opening and/or writing to output file error: {0}'.format(' '.join(er.args))) Index: cloudsync/common/enums.py =================================================================== diff -u --- cloudsync/common/enums.py (revision 0) +++ cloudsync/common/enums.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,96 @@ +from enum import Enum, unique + + +class RootEnum(Enum): + + @classmethod + def has_value(cls, value): + return value in cls._value2member_map_ + + @classmethod + def mapped_int_value(cls, value): + try: + return cls._value2member_map_[value] + except: + return None + + @classmethod + def mapped_str_value(cls, value): + try: + return cls._value2member_map_[int(value)] + except: + return None + + +@unique +class HDOpModes(RootEnum): + MODE_FAUL = 0 # Fault mode + MODE_SERV = 1 # Service mode + MODE_INIT = 2 # Initialization & POST mode + MODE_STAN = 3 # Standby mode + MODE_TPAR = 4 # Treatment Parameters mode + MODE_PRET = 5 # Pre-Treatment mode + MODE_TREA = 6 # Treatment mode + MODE_POST = 7 # Post-Treatment mode + NUM_OF_MODES = 8 # Number of HD operation modes + + +@unique +class HDOpSubModes(RootEnum): + SUBMODE_START = 0 + SUBMODE_WAIT_FOR_TREATMENT = 1 + SUBMODE_WAIT_FOR_DISINFECT = 2 + SUBMODE_DG_FLUSH_IN_PROGRESS = 3 + SUBMODE_DG_HEAT_DISINFECT_IN_PROGRESS = 4 + SUBMODE_DG_CHEMICAL_DISINFECT_IN_PROGRESS = 5 + NUM_OF_MODES = 6 + + +@unique +class DeviceStates(RootEnum): + INACTIVE_NOT_OK = 1 + INACTIVE_OK = 2 + ACTIVE_OK = 3 + ACTIVE_READY = 4 + ACTIVE_IN_TREATMENT = 5 + ACTIVE_NOT_READY = 6 + ACTIVE_NOT_OK = 7 + DECOMMISSIONED = 8 + UNKNOWN_STATE = 9 + + +@unique +class InboundMessageIDs(RootEnum): + # REGISTRATION + UI2CS_REQ_REGISTRATION = 1001 + UI2CS_SEND_DEVICE_INFO = 1002 + UI2CS_SEND_CREDENTIALS_SAVED = 1003 + UI2CS_SEND_CREDENTIALS = 1004 + UI2CS_SEND_FACTORY_RESET_CONFIRMATION = 1005 + + # OPERATION + UI2CS_SEND_DEVICE_STATE = 1006 + UI2CS_SEND_TREATMENT_REPORT = 1007 + + +@unique +class OutboundMessageIDs(RootEnum): + # REGISTRATION + CS2UI_REQ_DEVICE_INFO = 2002 + CS2UI_REQ_SAVE_CREDENTIALS = 2003 + CS2UI_REQ_CREDENTIALS = 2004 + CS2UI_REQ_FACTORY_RESET = 2005 + + # OPERATION + CS2UI_REQ_DEVICE_STATE = 2006 + CS2UI_REQ_TX_CODE_DISPLAY = 2008 + + +@unique +class NetworkRequestType(RootEnum): + MFT2CS_REQ_SET_CREDENTIALS = 4 + MFT2CS_REQ_INIT_CONNECTIVITY_TEST = 5 + MFT2CS_REQ_FACTORY_RESET = 13 + CS2MFT_REQ_REGISTRATION = 101 + CS2DCS_REQ_SET_DEVICE_STATE = 201 + CS2DCS_REQ_SEND_TREATMENT_REPORT = 202 Index: cloudsync/config/config.json =================================================================== diff -u --- cloudsync/config/config.json (revision 0) +++ cloudsync/config/config.json (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,19 @@ +{ + "kebormed_paas": { + "idp_client_secret": "94bf9193-4beb-4b3b-b973-5ee807d1b3a3", + "url_mft": "http://192.168.10.216:3000", + "url_dcs": "https://device-api.diality.qa.kebormed.com", + "url_device_identity": "https://device-identity.diality.qa.kebormed.com:31400/auth/realms/Main/protocol/openid-connect/token", + "dia_org_id": 1 + }, + "device": { + "ip": "192.168.10.153", + "port": 80, + "name": "hd-HD_Serial_00006", + "hd_serial": "hd-HD_Serial_00006", + "dg_serial": "dg-DG_Serial_00006", + "sw_version": "develop.06011728", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + } +} \ No newline at end of file Index: cloudsync/config/config_INT.json =================================================================== diff -u --- cloudsync/config/config_INT.json (revision 0) +++ cloudsync/config/config_INT.json (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,19 @@ +{ + "kebormed_paas": { + "idp_client_secret": "94bf9193-4beb-4b3b-b973-5ee807d1b3a3", + "url_mft": "http://192.168.10.83", + "url_dcs": "https://device-api.diality.integration.kebormed.com", + "url_device_identity": "https://device-identity.diality.integration.kebormed.com:31400/auth/realms/Main/protocol/openid-connect/token", + "dia_org_id": 1 + }, + "device": { + "ip": "192.168.10.153", + "port": 5001, + "name": "hd-HD_Serial_00006", + "hd_serial": "hd-HD_Serial_00006", + "dg_serial": "dg-DG_Serial_00006", + "sw_version": "develop.06011728", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + } +} \ No newline at end of file Index: cloudsync/config/config_QA.json =================================================================== diff -u --- cloudsync/config/config_QA.json (revision 0) +++ cloudsync/config/config_QA.json (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,19 @@ +{ + "kebormed_paas": { + "idp_client_secret": "94bf9193-4beb-4b3b-b973-5ee807d1b3a3", + "url_mft": "http://192.168.10.216:3000", + "url_dcs": "https://device-api.diality.qa.kebormed.com", + "url_device_identity": "https://device-identity.diality.qa.kebormed.com:31400/auth/realms/Main/protocol/openid-connect/token", + "dia_org_id": 1 + }, + "device": { + "ip": "192.168.10.153", + "port": 5001, + "name": "hd-HD_Serial_00006", + "hd_serial": "hd-HD_Serial_00006", + "dg_serial": "dg-DG_Serial_00006", + "sw_version": "develop.06011728", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + } +} \ No newline at end of file Index: cloudsync/config/treatment_report_template.json =================================================================== diff -u --- cloudsync/config/treatment_report_template.json (revision 0) +++ cloudsync/config/treatment_report_template.json (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,55 @@ +{ + "organizationId": "", + "serialNumber": "", + "data": { + "patient": { + "id": "" + }, + "treatment": { + "parameters": { + "bloodFlowRate": 0, + "dialysateFlowRate": 0, + "treatmentDuration": 0, + "dialysate": { + "acidCode": "", + "bicarbCode": "", + "K": 0, + "Ca": 0, + "HCO3": 0, + "Na": 0 + }, + "dialysateTemp": 0, + "dialyzerModel": "", + "heparinType": "", + "heparinConcentration": 0, + "heparinBolus": 0, + "heparinRate": 0, + "heparinStopBeforeTreatmentEnd": 0 + }, + "time": { + "start": 0, + "end": 0, + "treatmentDuration": 0 + }, + "data": [], + "events": [], + "alarms": [] + }, + "deviceTreatmentData": { + "dialysateVolumeUsed": 0, + "prescribedUltrafiltrationVolume": 0, + "finalTargetUltrafiltrationVolume": 0, + "actualUltrafiltrationVolume": 0, + "prescribedUltrafiltrationRate": 0, + "finalTargetUltrafiltrationRate": 0, + "actualUltrafiltrationRate": 0, + "salineBolusVolumeGiven": 0, + "heparinDelivered": 0 + }, + "diagnostics": { + "waterSampleTestResult": "" + } + }, + "completedAt": 0, + "generatedAt": 0 +} \ No newline at end of file Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u --- cloudsync/handlers/cs_mft_dcs_request_handler.py (revision 0) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,249 @@ +from time import time + +from logging import Logger +from threading import Event, Thread +from collections import deque +from random import seed, randint + +from cloudsync.handlers.network_request import NetworkRequest +from cloudsync.utils.helpers import log_func +from cloudsync.common.enums import * +from cloudsync.utils.globals import * +from cloudsync.handlers.outgoing.handler_cs_to_mft import * +from cloudsync.handlers.outgoing.handler_cs_to_dcs import * +from cloudsync.handlers.incoming.handler_mft_to_cs import * + + +class NetworkRequestHandler: + def __init__(self, logger: Logger, max_size, output_channel, reachability_provider): + self.logger = logger + self.reachability_provider = reachability_provider + self.logger.info('Created Network Request Handler') + self.output_channel = output_channel + self.queue = deque(maxlen=max_size) # Thread safe + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new requests + :return: None + """ + while True: + flag = self.event.wait() + # print('Flag: {0}'.format(flag)) + if flag: + req = self.queue.popleft() + self.handle_request(req) + if len(self.queue) == 0: + self.event.clear() + + def enqueue_request(self, req: NetworkRequest) -> bool: + """ + :param req: (Tuple) the data to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + # print('network queue size before: {0}'.format(len(self.queue))) + self.queue.append(req) + # print('network queue size after: {0}'.format(len(self.queue))) + self.event.set() + return True + else: + return False + + def handle_request(self, req: NetworkRequest) -> None: + """ + Called on each timer event. Logs the data currently in deque + :return: None + """ + if req.request_type == NetworkRequestType.CS2MFT_REQ_REGISTRATION: + response = cmd_outgoing_register(device_name=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME], + hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + manf_tool_base_url=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_MFT_URL], + data_source_id=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DIA_ORG_ID], + headers=req.headers) + + self.logger.debug("DRT Request registration resp: {0}".format(response)) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS: + certificate = req.payload.get("certificate") + private_key = req.payload.get("private_key") + public_key = req.payload.get("public_key") + cmd_incoming_set_credentials(certificate=certificate, + private_key=private_key, + public_key=public_key, + output_channel=self.output_channel) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST: + url_validate = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + url_validate = urllib.parse.urljoin(url_validate, "api/device/validate") + cmd_incoming_initiate_connectivity_test( + url_token=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL], + url_validate=url_validate, + hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + g_config=req.g_config) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_FACTORY_RESET: + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'operation' + helpers_write_config(CONFIG_PATH, req.g_config) + cmd_incoming_factory_reset(output_channel=self.output_channel) + # OPERATION MODE + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE: + device_state = req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] + + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin(base_url, "api/common/info") + device_state_url = urllib.parse.urljoin(base_url, "/api/device/state/{0}".format(device_state)) + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + response = cmd_outgoing_set_device_state(url=device_state_url, + access_token=access_token) + + self.logger.debug("DCS Request device state response: {0}".format(response.text)) + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT: + treatment_log_json = req.payload + patient_emr_id = treatment_log_json['data']['patient']['id'] + + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin(base_url, "api/common/info") + validate_url = urllib.parse.urljoin(base_url, "api/device/validate") + data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + # Step #1 - get organization id for current device + + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + dg_serial_number=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + url=validate_url) + + invalid_attributes = response.get("invalidAttributes", None) + + if len(invalid_attributes) > 0: + self.logger.error( + "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid device for treatment. Invalid fields: {3}".format( + hd_serial_number, dg_serial_number, sw_version, invalid_attributes)) + return response + else: + organization_id = response.get("associatedOrganizationId", None) + treatment_log_json['organizationId'] = organization_id + + # Step #2 - Set patient by patient_emr_id + + patient_with_emr_id_exists_url = urllib.parse.urljoin(base_url, + "api/device/organization/{0}/patient/{1}".format( + organization_id, patient_emr_id)) + create_temporary_patient_url = urllib.parse.urljoin(base_url, + "api/device/organization/{0}/patient/{1}".format( + organization_id, patient_emr_id)) + + # Step #2a - Check if patient exists by patient_emr_id + + response = cmd_outgoing_check_if_patient_wit_emr_id_exists(access_token=access_token, + url=patient_with_emr_id_exists_url) + + # Step #2b - If patient with emr_id doesn't exist, create temporary patient + + if response.status_code == 200: + patient_id = response.json().get("id", None) + elif response.status_code == 404: + response = cmd_outgoing_create_temporary_patient(access_token=access_token, + url=create_temporary_patient_url) + patient_id = response.get("id", None) + else: + g_utils.logger.error("Patient didn't exist and a temporary patient couldn't be created") + return response + + patient_device_association_url = urllib.parse.urljoin(base_url, + "/api/device/organization/{0}/patient/{1}/association".format( + organization_id, patient_id)) + # Step #3 - Check if device is associated to patient and if not associate it + + response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, + access_token=access_token, + associate=True) + # Step #4 - Send treatment report + + treatment_log_json['generatedAt'] = int(round(time() * 1000)) + 30000 + treatment_log_json['completedAt'] = int(round(time() * 1000)) + 31000 + + treatment_log_json = json.dumps(treatment_log_json) + + self.logger.debug("treatment log: {0}".format(treatment_log_json)) + + response = cmd_outgoing_send_treatment_report(url=data_submission_url, + access_token=access_token, + treatment_log=treatment_log_json) + treatment_id = response.json().get("reference", None) + + # Step #5 - Remove patient/device association + + response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, + access_token=access_token, + associate=False) + # Step #6 - Send TX code to UI app + + message_body = str(OutboundMessageIDs.CS2UI_REQ_TX_CODE_DISPLAY.value) + ',1,' + treatment_id + self.output_channel.enqueue_message(message_body) + + return response + else: + g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) + + def get_valid_token(self, identity_url, token_verification_url, client_secret): + access_token = helpers_get_stored_token() + + if access_token is None: + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret) + else: + response = cmd_outgoing_verify_token(url=token_verification_url, + access_token=access_token) + if response.status_code == 401: + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret) + + self.remove_secrets() + return access_token + + def wait_for_network(self, wait_time): + counter = 0 + while not self.reachability_provider.reachability and counter < wait_time: + counter += 1 + sleep(1) + if counter == wait_time: + return False + else: + return True + + def remove_secrets(self): + try: + if os.path.exists(CREDENTIALS_CERTIFICATE_X509): + os.remove(CREDENTIALS_CERTIFICATE_X509) + if os.path.exists(CREDENTIALS_PRIVATE_KEY): + os.remove(CREDENTIALS_PRIVATE_KEY) + if os.path.exists(CREDENTIALS_PUBLIC_KEY): + os.remove(CREDENTIALS_PUBLIC_KEY) + self.logger.info('Credentials deleted locally') + except IOError as er: + self.logger.error('Error removing secrets: {0}'.format(' '.join(er.args))) Index: cloudsync/handlers/incoming/handler_dcs_to_cs.py =================================================================== diff -u --- cloudsync/handlers/incoming/handler_dcs_to_cs.py (revision 0) +++ cloudsync/handlers/incoming/handler_dcs_to_cs.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1 @@ +# This file is a placeholder for now. It may be removed if it is not needed \ No newline at end of file Index: cloudsync/handlers/incoming/handler_mft_to_cs.py =================================================================== diff -u --- cloudsync/handlers/incoming/handler_mft_to_cs.py (revision 0) +++ cloudsync/handlers/incoming/handler_mft_to_cs.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,142 @@ +import os + +from cloudsync.handlers.outgoing import * +from cloudsync.utils.helpers import * +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.handlers.outgoing.handler_cs_to_mft import * +from cloudsync.handlers.outgoing.handler_cs_to_dcs import * + + +@log_func +def cmd_incoming_set_credentials(certificate: str, private_key: str, + public_key: str, output_channel) -> None: + """ + Sets the credentials on the device + :param certificate: The X.509 certificate + :param private_key: The private key + :param public_key: The public key + :param output_channel: CS_UI output channel + :return: None + """ + try: + f = open(CREDENTIALS_CERTIFICATE_X509, 'w') + f.write(certificate) + f.close() + + f = open(CREDENTIALS_PRIVATE_KEY, 'w') + f.write(private_key) + f.close() + + f = open(CREDENTIALS_PUBLIC_KEY, 'w') + f.write(public_key) + f.close() + + f = open(TEMP_CREDENTIALS_CERTIFICATE_X509, 'w') + f.write(certificate) + f.close() + + f = open(TEMP_CREDENTIALS_PRIVATE_KEY, 'w') + f.write(private_key) + f.close() + + f = open(TEMP_CREDENTIALS_PUBLIC_KEY, 'w') + f.write(public_key) + f.close() + + message_body = str(OutboundMessageIDs.CS2UI_REQ_SAVE_CREDENTIALS.value) + ',3,' + CREDENTIALS_CERTIFICATE_X509 + ',' + CREDENTIALS_PRIVATE_KEY + ',' + CREDENTIALS_PUBLIC_KEY + output_channel.enqueue_message(message_body) + + except IOError as er: + self.logger.error('Error writing secrets: {0}'.format(' '.join(er.args))) + + +@log_func +def cmd_incoming_initiate_connectivity_test(url_token: str, + url_validate: str, + hd_serial: str, + dg_serial: str, + sw_version: str, + g_config: dict) -> None: + """ + Initiates the connectivity test on the device + # Step 6. Device Auth (POST /auth/...) + # Step 7. Device Auth Response (access token) + # Step 8. Validate Device (POST /api/device/validate) + # Step 9. Validate Device Response (list of invalid fields) + # Step 10. Validation Result (POST /validation) (list of invalid fields) + :param url_token: The kebormed request new token url + :param username: The username for requesting a token + :param password: The password for requesting a token + :param hd_serial: The hd serial number + :param dg_serial: The dg serial number + :param sw_version: The software version + :param g_config: The configuration dictionary + :return: None + """ + # Step 6. Device Auth (POST /auth/...) + # Step 7. Device Auth Response (access token) + client_secret = g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=TEMP_CREDENTIALS_CERTIFICATE_X509, + path_private_key=TEMP_CREDENTIALS_PRIVATE_KEY, + save=True, + url=url_token, + client_secret=client_secret) + if access_token is not None: + g_utils.logger.debug("Access token: {0}".format(access_token)) + # Step 8. Validate Device (POST /api/device/validate) + # Step 9. Validate Device Response (list of invalid fields) + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=hd_serial, + dg_serial_number=dg_serial, + sw_version=sw_version, + url=url_validate) + g_utils.logger.debug("Response: {0}".format(response)) + invalid_attributes = response.get("invalidAttributes", None) + + print("invalid attributes: {0}", invalid_attributes) + + g_utils.logger.debug("Invalid fields: {0}".format(invalid_attributes)) + if invalid_attributes is not None: + # Step 10. Validation Result (POST /validation) (list of invalid fields) + json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config) + g_utils.logger.debug("Json response: {0}".format(json_resp)) + else: + g_utils.logger.debug("Access token is None.") + + +@log_func +def cmd_incoming_factory_reset(output_channel) -> None: + """ + Initiates a factory reset on the device + + :return: None + """ + try: + if os.path.exists(TEMP_CREDENTIALS_CERTIFICATE_X509): + os.remove(TEMP_CREDENTIALS_CERTIFICATE_X509) + if os.path.exists(TEMP_CREDENTIALS_PRIVATE_KEY): + os.remove(TEMP_CREDENTIALS_PRIVATE_KEY) + if os.path.exists(TEMP_CREDENTIALS_PUBLIC_KEY): + os.remove(TEMP_CREDENTIALS_PUBLIC_KEY) + except IOError as er: + self.logger.error('Error removing secrets: {0}'.format(' '.join(er.args))) + + message_body = str(OutboundMessageIDs.CS2UI_REQ_FACTORY_RESET.value) + ',0' + output_channel.enqueue_message(message_body) + + +@log_func +def cmd_incoming_update_manufacturing_tool_base_url(url: str, g_config: dict) -> None: + """ + Updates the manufacturing tool base url + + :param url: The new url + :param g_config: The cloudsync config + :return: None + """ + helpers_update_config(path=CONFIG_PATH, + config=g_config, + group=CONFIG_KEBORMED, + section=CONFIG_KEBORMED_MANF_TOOL_URL, + value=url) Index: cloudsync/handlers/network_request.py =================================================================== diff -u --- cloudsync/handlers/network_request.py (revision 0) +++ cloudsync/handlers/network_request.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,27 @@ +from cloudsync.common.enums import * +from cloudsync.utils.globals import * + + +class NetworkRequest: + + def __init__(self, request_type: NetworkRequestType, url: str, payload: dict, method: str, g_config: dict): + self.request_type = request_type + self.url = url + self.payload = payload + self.method = method + self.g_config = g_config + + self.headers = { + "device_ip": self.g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP], + "device_sn": self.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + } + + def __str__(self): + data = { + "request_type": self.request_type.name, + "url": self.url, + "headers": self.headers, + "payload": self.payload, + "method": self.method, + } + return str(data) \ No newline at end of file Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (revision 0) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,251 @@ +from typing import List, Tuple +import json +import requests +import urllib.parse + +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.utils.helpers import * + + + +@log_func +def cmd_outgoing_set_patient_device_association(url: str, access_token: str, associate: bool) -> requests.Response: + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + payload = {} + + if associate: + resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), + headers=headers, + data=payload, + verify=False) + + if resp.status_code == 404: + resp = requests.put(url=url, + headers=headers, + data=payload, + verify=False) + else: + resp = requests.delete(url=url, + headers=headers, + data=payload, + verify=False) + return resp + + +@log_func +def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str) -> requests.Response: + """ + Sends a treatment report to DCS + + :return: requests.Response + """ + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + + payload = treatment_log + + print("paylod: {0}".format(payload)) + + resp = requests.post(url=url, + headers=headers, + data=payload, + verify=False) + return resp + + +@log_func +def cmd_outgoing_set_device_state(url: str, access_token: str) -> requests.Response: + """ + Updates the backend with the current device state + :param current_state: (int) the current device state + :return: The response + """ + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + payload = {} + resp = requests.put(url=url, + headers=headers, + data=payload, + verify=False) + return resp + + +@log_func +def cmd_outgoing_get_new_token_with_cert(path_certificate: str, + path_private_key: str, + save: bool, + url: str, + client_secret: str) -> str: + """ + Performs the following steps in the device registration sequence diagram: + Step 6. Device Auth + Step 7. Device Auth Response + + :param path_certificate: The path to the certificate from set credentials + :param path_private_key: The path to the public key form set credentials + :param save: If True, save the token + :param url: The kebormed url to request the new token + :param client_secret: The secret client key + :return: The new token + """ + + payload = { + "grant_type": "password", + "scope": "openid profile", + "client_id": "device-client", + "client_secret": client_secret, + } + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + cert_paths = (path_certificate, path_private_key) + + response = requests.post(url=url, + headers=headers, + data=payload, + cert=cert_paths, + verify=False) + + data = response.json() + if save: + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: + f.write(json.dumps(data, indent=4)) + + return data.get("access_token", None) + +@log_func +def cmd_outgoing_verify_token(url: str, access_token: str) -> requests.Response: + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + + resp = requests.get(url=url, + headers=headers, + verify=False) + return resp + +@log_func +def cmd_outgoing_get_new_token(username: str, password: str, save: bool, url: str) -> str: + """ + Performs the following steps in the device registration sequence diagram: + Step 6. Device Auth + Step 7. Device Auth Response + + :param username: The username required for login + :param password: The password required for login + :param save: If True, save the token + :param url: The kebormed url to request the new token + :return: The new token + """ + + payload = ("grant_type=password&scope=openid%20profile%20email%20" + "offline_access&username={0}&password={1}&client_id=frontend-client").format(username, password) + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + + response = requests.post(url=url, + headers=headers, + data=payload, + verify=False) + data = response.json() + if save: + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: + f.write(json.dumps(data, indent=4)) + + access_token = data.get("access_token", None) + return access_token + + +@log_func +def cmd_outgoing_validate_device(access_token: str, + hd_serial_number: str, + dg_serial_number: str, + sw_version: str, + url: str) -> dict: + """ + Step 8. Validate device + Step 9. Validate device response (list of invalid fields) + :return: The json response + """ + + payload = json.dumps({ + "hdSerialNumber": hd_serial_number, + "dgSerialNumber": dg_serial_number, + "softwareVersion": sw_version + }) + + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + + response = requests.post(url=url, + headers=headers, + data=payload, + verify=False) + + try: + return response.json() + except json.decoder.JSONDecodeError as e: + g_utils.logger.error("Could not validate device. Received: {0}:{1}" + .format(response.status_code, + response.reason)) + return None + + +# Runtime commands + +@log_func +def cmd_outgoing_check_if_patient_wit_emr_id_exists(access_token: str, + url: str) -> requests.Response: + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + + response = requests.get(url=url, + headers=headers, + verify=False) + + try: + return response + except json.decoder.JSONDecodeError as e: + g_utils.logger.error("Could not check if the patient exists. Received: {0}:{1}" + .format(response.status_code, + response.reason)) + return None + + +@log_func +def cmd_outgoing_create_temporary_patient(access_token: str, + url: str) -> dict: + headers = { + 'Authorization': "Bearer {0}".format(access_token), + 'Content-Type': 'application/json' + } + + payload = {} + + response = requests.post(url=url, + headers=headers, + data=payload, + verify=False) + + try: + return response.json() + except json.decoder.JSONDecodeError as e: + g_utils.logger.error("Could not create temporary patient. Received: {0}:{1}" + .format(response.status_code, + response.reason)) + return None Index: cloudsync/handlers/outgoing/handler_cs_to_mft.py =================================================================== diff -u --- cloudsync/handlers/outgoing/handler_cs_to_mft.py (revision 0) +++ cloudsync/handlers/outgoing/handler_cs_to_mft.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,85 @@ +import json +import requests +from time import time +import random +from typing import List +import urllib.parse + +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.utils.helpers import * + + +@log_func +def cmd_outgoing_register(device_name: str, + hd_serial: str, + dg_serial: str, + sw_version: str, + manf_tool_base_url: str, + data_source_id: int, + headers: dict) -> requests.Response: + """ + Initiates device registration with the manufacturing tool + Device -> Manufacturing Tool + + :param device_name: the device name + :param hd_serial: the hd serial number + :param dg_serial: the dg serial number + :param sw_version: the device software version + :param manf_tool_base_url: the manufacturing tool base url + :param data_source_id: the data source identification number + :param headers: the header dictionary + :return: requests.Response + """ + # print('---Registration request made') + url = urllib.parse.urljoin(manf_tool_base_url, "register/") + data = { + "datasourceId": data_source_id, + "name": device_name, + "hdSerialNumber": hd_serial, + "dgSerialNumber": dg_serial, + "softwareVersion": sw_version, + } + resp = requests.post(url=url, + data=data, + headers=headers, + verify=False) + return resp + + +@log_func +def cmd_outgoing_validation_result(invalid_attributes: List[str], + hd_serial: str, + g_config: dict) -> dict: + """ + Sends the validation POST to the manf tool + :param invalid_attributes: The list of invalid attributes + :param hd_serial: The hd serial number + :param g_config: The device config + :return: The json response + """ + + if (CONFIG_KEBORMED not in g_config.keys() or + CONFIG_KEBORMED_MFT_URL not in g_config[CONFIG_KEBORMED]): + g_utils.logger.error("Manufacturing tool url not found.") + return None + url = urllib.parse.urljoin(g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_MFT_URL], + "validation/") + + headers = { + "device_ip": g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP], + "device_sn": hd_serial + } + if len(invalid_attributes) == 0: + data = json.dumps({}) + else: + data = { + "invalidAttributes": invalid_attributes, + } + g_utils.logger.debug("headers: {0}".format(headers)) + g_utils.logger.debug("data: {0}".format(data)) + resp = requests.post(url=url, + data=data, + headers=headers, + verify=False) + return resp.json() Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u --- cloudsync/handlers/ui_cs_request_handler.py (revision 0) +++ cloudsync/handlers/ui_cs_request_handler.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,160 @@ +from time import sleep +from logging import Logger +from threading import Event, Thread +from collections import deque + +from cloudsync.handlers.uics_message import UICSMessage +from cloudsync.common.enums import * +from cloudsync.utils.globals import * +from cloudsync.utils.helpers import * + +import urllib.parse +import os +import shutil + + +class UICSMessageHandler: + + def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider): + self.logger = logger + self.reachability_provider = reachability_provider + self.network_request_handler = network_request_handler + self.output_channel = output_channel + self.queue = deque(maxlen=max_size) # Thread safe + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + self.logger.info('Created UICS Handler') + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new messages + :return: None + """ + while True: + flag = self.event.wait() + if flag: + while len(self.queue) > 0: + # print('queue size: {0}'.format(len(self.queue))) + message = self.queue.popleft() + self.handle_message(message) + self.event.clear() + + def enqueue_message(self, message: UICSMessage) -> bool: + """ + :param message: the data to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(message) + self.event.set() + return True + else: + return False + + def handle_message(self, message: UICSMessage): + self.logger.info('CS Message: {0}'.format(message)) + + if (InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_REGISTRATION) and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration'): + self.logger.info('UI2CS_REQ_REGISTRATION request received') + self.logger.debug('Config: {0}'.format(message.g_config)) + + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] = message.parameters[0] + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] = message.parameters[1] + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] = message.parameters[2] + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME] = message.parameters[0] + + helpers_write_config(CONFIG_PATH, message.g_config) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2MFT_REQ_REGISTRATION request added to network queue') + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_CREDENTIALS_SAVED and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration'): + self.logger.info('UI2CS_SEND_CREDENTIALS_SAVED request received') + self.remove_secrets() + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_CREDENTIALS: + self.logger.info('UI2CS_SEND_CREDENTIALS request received') + source_cert = message.parameters[0] + CERTIFICATE_X509_FILE_NAME + source_private_key = message.parameters[0] + PRIVATE_KEY_FILE_NAME + source_public_key = message.parameters[0] + PUBLIC_KEY_FILE_NAME + self.save_secrets(source_cert, source_private_key, source_public_key) + # OPERATION MODE + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_DEVICE_STATE and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_SEND_DEVICE_STATE request received") + self.logger.debug("HD Mode: {0}".format(HDOpModes.mapped_str_value(message.parameters[0]))) + self.logger.debug("HD Sub-Mode: {0}".format(HDOpSubModes.mapped_str_value(message.parameters[1]))) + + try: + device_state = helpers_device_state_to_cloud_state( + HDOpModes.mapped_str_value(message.parameters[0]), + HDOpSubModes.mapped_str_value(message.parameters[1])) + except: + device_state = DeviceStates.UNKNOWN_STATE + + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] = device_state.value + self.logger.info("Device state: {0}".format(device_state.name)) + + helpers_write_config(CONFIG_PATH, message.g_config) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network queue') + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_TREATMENT_REPORT and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_SEND_TREATMENT_REPORT request received") + + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + + treatment_log_json = helpers_read_treatment_log_file(message.parameters[0]) + treatment_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, + url='', + payload=treatment_log_json, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network ' + 'queue') + + def save_secrets(self, source_cert, source_private_key, source_public_key): + try: + shutil.copy(source_cert, CREDENTIALS_CERTIFICATE_X509) + shutil.copy(source_private_key, CREDENTIALS_PRIVATE_KEY) + shutil.copy(source_public_key, CREDENTIALS_PUBLIC_KEY) + + self.logger.info('Credentials saved locally') + + except shutil.SameFileError as er: + self.logger.warning('Source and destination files identical: {0}'.format(' '.join(er.args))) + + except PermissionError as er: + self.logger.error('Permission error: {0}'.format(' '.join(er.args))) + + def remove_secrets(self): + try: + if os.path.exists(CREDENTIALS_CERTIFICATE_X509): + os.remove(CREDENTIALS_CERTIFICATE_X509) + if os.path.exists(CREDENTIALS_PRIVATE_KEY): + os.remove(CREDENTIALS_PRIVATE_KEY) + if os.path.exists(CREDENTIALS_PUBLIC_KEY): + os.remove(CREDENTIALS_PUBLIC_KEY) + self.logger.info('Credentials deleted locally') + except IOError as er: + self.logger.error('Error removing secrets: {0}'.format(' '.join(er.args))) Index: cloudsync/handlers/uics_message.py =================================================================== diff -u --- cloudsync/handlers/uics_message.py (revision 0) +++ cloudsync/handlers/uics_message.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,34 @@ +class UICSMessage: + + def __init__(self, message_body: str, g_config: dict): + self.g_config = g_config + + message_components = message_body.split(',') + if len(message_components) >= 5: + self.timestamp = message_components[0] + self.sequence = message_components[1] + self.CRC = message_components[2] + self.ID = message_components[3] + self.size = message_components[4] + if int(message_components[4]) > 0: + self.parameters = message_components[5:] + else: + self.parameters = {} + else: + self.timestamp = 0 + self.sequence = 0 + self.CRC = 0 + self.ID = 0 + self.size = 0 + self.parameters = {} + + def __str__(self): + data = { + "timestamp": self.timestamp, + "sequence": self.sequence, + "CRC": self.CRC, + "ID": self.ID, + "number_of_parameters": self.size, + "parameters": self.parameters, + } + return str(data) \ No newline at end of file Index: cloudsync/utils/globals.py =================================================================== diff -u --- cloudsync/utils/globals.py (revision 0) +++ cloudsync/utils/globals.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,105 @@ +import os +import socket + +# For running on device both VM & SERVER should be set to False +# For running in PROD DEBUG should be set to False +DEBUG = True +VM = False +SERVER = False + +# PATHS +PATH_HOME = os.getcwd() + '/' +PATH_CLOUDSYNC = PATH_HOME + +# Configuration groups +CONFIG_DEVICE = 'device' +CONFIG_DEVICE_IP = 'ip' +CONFIG_DEVICE_PORT = 'port' +CONFIG_DEVICE_NAME = 'name' +CONFIG_DEVICE_HD_SERIAL = 'hd_serial' +CONFIG_DEVICE_DG_SERIAL = 'dg_serial' +CONFIG_DEVICE_SW_VERSION = 'sw_version' +CONFIG_DEVICE_MODE = 'mode' +CONFIG_DEVICE_STATE = 'device_state' + +CONFIG_KEBORMED = "kebormed_paas" +CONFIG_KEBORMED_MFT_URL = "url_mft" +CONFIG_KEBORMED_DCS_URL = "url_dcs" +CONFIG_KEBORMED_IDP_CLIENT_SECRET = "idp_client_secret" +CONFIG_KEBORMED_DEVICE_IDENTITY_URL = "url_device_identity" +CONFIG_KEBORMED_DIA_ORG_ID = "dia_org_id" + +# CONFIG +CONFIG_PATH = os.path.join(PATH_HOME, "cloudsync/config/config.json") + +# LOGS +CS_LOG_FILE = os.path.join(PATH_HOME, "cloudsync.log") + +# DEVICE TOKEN +DEVICE_KEBORMED_ACCESS_TOKEN_PATH = os.path.join(PATH_CLOUDSYNC, "data/access_token.json") + +# CREDENTIALS +TEMP_CREDENTIALS_CERTIFICATE_X509 = os.path.join(PATH_CLOUDSYNC, "data/client_certificate.pem") +TEMP_CREDENTIALS_PRIVATE_KEY = os.path.join(PATH_CLOUDSYNC, "data/client_private_key.pem") +TEMP_CREDENTIALS_PUBLIC_KEY = os.path.join(PATH_CLOUDSYNC, "data/client_public_key.pem") + +CREDENTIALS_CERTIFICATE_X509 = "/tmp/client_certificate.pem" +CREDENTIALS_PRIVATE_KEY = "/tmp/client_private_key.pem" +CREDENTIALS_PUBLIC_KEY = "/tmp/client_public_key.pem" + +CERTIFICATE_X509_FILE_NAME = "client_certificate.pem" +PRIVATE_KEY_FILE_NAME = "client_private_key.pem" +PUBLIC_KEY_FILE_NAME = "client_public_key.pem" + +# UI2CS VALUES +UI2CS_FILE_CHANNELS_PATH = "/media/sd-card/cloudsync" + + +# TREATMENT REPORT SECTIONS +TITLE = "[Title]" +TREATMENT_PRESCRIPTION = "[Treatment Prescription]" +TREATMENT_PARAMETERS = "[Treatment Parameters]" +POST_TREATMENT_DATA = "[Post-Treatment Data]" +EXTRA = "[Extra]" +TREATMENT_DATA = "[Treatment Data]" +TREATMENT_ALARMS = "[Treatment Alarms]" +TREATMENT_EVENTS = "[Treatment Events]" + +# TREATMENT SECTION CHARACTERS +SECTION_START_CHARACTER = "[" +SECTION_STOP_CHARACTER = "]" + +# UI APP - TREATMENT REPORT VALUES +PATIENT_ID = "Patient ID" +TREATMENT_DURATION = "Treatment Duration" +BLOOD_FLOW_RATE = "Blood Flow Rate" +DIALYSATE_FLOW_RATE = "Dialysate Flow Rate" +ACID_CONCENTRATE_TYPE = "Acid ConcentrateType" +BICARBONATE_CONCENTRATE_TYPE = "Bicarbonate Concentrate Type" +POTASSIUM_CONCENTRATION = "Potassium Concentration" +CALCIUM_CONCENTRATION = "Calcium Concentration" +BICARBONATE_CONCENTRATION = "Bicarbonate Concentration" +SODIUM_CONCENTRATION = "Sodium Concentration" +DIALYSATE_TEMPERATURE = "Dialysate Temperature" +DIALYZER_TYPE = "Dialyzer Type" +HEPARIN_TYPE = "Heparin Type" +HEPARIN_CONCENTRATION = "Heparin Concentration" +HEPARIN_BOLUS_VOLUME = "Heparin Bolus Volume" +HEPARIN_DISPENSE_RATE = "Heparin Dispense Rate" +HEPARIN_STOP = "Heparin Stop" +TREATMENT_START_DATE_TIME = "Treatment Start DateTime" +TREATMENT_END_DATE_TIME = "Treatment End DateTime" +ACTUAL_TREATMENT_DURATION = "Actual Treatment Duration" +DIALYSATE_VOLUME_USED = "Dialysate Volume Used" +PRESCRIBED_UF_VOLUME = "Prescribed UF Volume" +TARGET_UF_VOLUME = "Target UF Volume" +ACTUAL_UF_VOLUME = "Actual UF Volume" +PRESCRIBED_UF_RATE = "Prescribed UF Rate" +TARGET_UF_RATE = "Target UF Rate" +ACTUAL_UF_RATE = "Actual UF Rate" +SALINE_BOLUS_VOLUME = "Saline Bolus Volume" +HEPARIN_DELIVERED_VOLUME = "Heparin Delivered Volume" +WATER_SAMPLE_TEST_RESULT = "Water Sample Test Result" + +# TREATMENT TEMPLATE PATH +TREATMENT_REPORT_TEMPLATE_PATH = "cloudsync/config/treatment_report_template.json" Index: cloudsync/utils/heartbeat.py =================================================================== diff -u --- cloudsync/utils/heartbeat.py (revision 0) +++ cloudsync/utils/heartbeat.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,30 @@ +from logging import Logger +from threading import Thread +from time import sleep + +from cloudsync.utils.helpers import * +from cloudsync.common.enums import * + +class HeartBeatProvider: + HEARTBEAT_FREQ = 20 + send_heartbeat = False + + def __init__(self, logger: Logger, network_request_handler, output_channel): + self.logger = logger + self.network_request_handler = network_request_handler + self.output_channel = output_channel + self.thread = Thread(target=self.heartbeat, daemon=True) + self.thread.start() + + def heartbeat(self): + """ + Sending heartbeat to UI: requesting device state + """ + while True: + if self.send_heartbeat: + # requesting device state from UI + helpers_add_to_output_channel(output_channel=self.output_channel, + message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', + success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + + sleep(self.HEARTBEAT_FREQ) Index: cloudsync/utils/helpers.py =================================================================== diff -u --- cloudsync/utils/helpers.py (revision 0) +++ cloudsync/utils/helpers.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,434 @@ +import os +import json +import datetime + +from time import time, sleep +from typing import Union, Any +from logging import Logger + +from cloudsync.utils.singleton import SingletonMeta +from cloudsync.common.enums import * +from cloudsync.handlers.network_request import NetworkRequest +from cloudsync.utils.reachability import * +from cloudsync.utils.globals import * + + +class GUtils(metaclass=SingletonMeta): + + def __init__(self): + self.logger = None + self.reachability_provider = None + + def add_logger(self, logger: Logger): + self.logger = logger + + def add_reachability_provider(self, reachability_provider: ReachabilityProvider): + self.reachability_provider = reachability_provider + + +g_utils = GUtils() + + +def helpers_is_int(val: str) -> bool: + """ + Determines if the value can be converted to an int + + :param val: the value in string form + :return: True if can be converted to an int, false otherwise + """ + + try: + int(val) + return True + except ValueError: + return False + + +def helpers_is_float(val: str) -> bool: + """ + Determines if the value can be converted to a float + + :param val: the value in string form + :return: True if can be converted to a float, false otherwise + """ + try: + float(val) + except ValueError: + return False + return True + + +def helpers_try_numeric(val: str) -> Union[int, float, str]: + """ + Tries to convert the value to numeric. If it's not possible + leaves it as a string + + :param val: the value to convert + :return: the converted value if possible, otherwise it is returned as is + """ + + if helpers_is_int(val): + return int(val) + elif helpers_is_float(val): + return float(val) + else: + return val + + +def helpers_parse_treatment_log(path: str) -> dict: + """ + Converts a treatment.log file to a python dictionary + The treatment log needs to be formatted better. + Until then, this is a (non-ideal) way to read it. + + :param path: the path to the treatment log file + :return: the parsed treatment log file + """ + if not os.path.exists(path): + print("Path does not exist.") + return {} + + result = {} + with open(path, 'r') as f: + lines = f.readlines() + group = "NoGroup" + for line_ in lines: + line = line_.replace("\n", "") + if "[" and "]" in line: + group = line.split("[")[1].split("]")[0] + if group not in result: + result[group] = {} + else: + if group in ["Title", "Treatment Parameters", "Post-Treatment Data", "Extra"]: + tokens = line.split(",") + if len(tokens) > 1: + subgroup = tokens[0] + result[group][subgroup] = {} + result[group][subgroup]["value"] = helpers_try_numeric(tokens[1]) + if len(tokens) > 2: + if tokens[2] == "": + result[group][subgroup]["units"] = None + else: + result[group][subgroup]["units"] = tokens[2] + elif group in ["Treatment Data", "Treatment Alarms", "Treatment Events"]: + tokens = line.split(",") + tokens_converted = [] + for token in tokens: + tokens_converted.append(helpers_try_numeric(token)) + result[group]["data"] = result[group].get("data", []) + [tokens_converted] + + return result + + +def helpers_device_state_to_cloud_state(hd_mode: HDOpModes, hd_sub_mode: HDOpSubModes) -> DeviceStates: + """ + Inactive Not OK – N/A – HD f/w will not know active vs. inactive – UI or cloud will have to maintain assigned tenant, active/inactive and Ok/Not OK while inactive + Inactive OK – N/A + Active OK – N/A + + Active Ready – mode is 3 (standby) and sub-mode is 1 (wait for treatment) + Active In Treatment – mode between 4 and 7 (treatment params .. post treatment) + Active Not Ready – mode/sub-mode is anything other than ready, in-treatment, or not OK + Active Not OK – mode is 0 (fault) + + Decommissioned – N/A – HD f/w will not know if system is decommissioned + """ + if (hd_mode == HDOpModes.MODE_STAN) and (hd_sub_mode == HDOpSubModes.SUBMODE_WAIT_FOR_TREATMENT): + return DeviceStates.ACTIVE_READY + if (hd_mode == HDOpModes.MODE_TPAR) or (hd_mode == HDOpModes.MODE_PRET) or (hd_mode == HDOpModes.MODE_TREA) or ( + hd_mode == HDOpModes.MODE_POST): + return DeviceStates.ACTIVE_IN_TREATMENT + if hd_mode == HDOpModes.MODE_FAUL: + return DeviceStates.ACTIVE_NOT_OK + + return DeviceStates.ACTIVE_NOT_READY + + +def helpers_get_stored_token() -> Union[str, None]: + """ + Returns the stored token + :return: The token if found, otherwise returns None + """ + data = None + print('token path: {0}'.format(DEVICE_KEBORMED_ACCESS_TOKEN_PATH)) + if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): + return None + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: + try: + data = json.load(f) + except json.decoder.JSONDecodeError: + return None + + if data is None: + return None + + return data.get("access_token", None) + + +def helpers_read_config(path: str) -> dict: + """ + Read the configuration + :param path: the path to the configuration + :return: the loaded configuration + """ + if os.path.exists(path): + with open(path, 'r') as f: + config = json.load(f) + return config + else: + g_utils.logger.error("Configuration file does not exist: {0}".format(path)) + return {} + + +def helpers_write_config(path: str, config: dict) -> None: + """ + Writes the config to the provided path + + :param path: the path where the config json will be written + :param config: the config dictionary + :return: None + """ + with open(path, 'w') as f: + json.dump(config, f, indent=4) + + +def helpers_update_config(path: str, config: dict, group: str, section: str, value: Any) -> None: + """ + Writes the configuration + :param path: the path to the configuration file + :param config: the json config + :return: None + """ + if not config.get(group, False): + config[group] = {} + config[group][section] = value + helpers_write_config(path, config) + + +def helpers_read_access_token(path: str) -> dict: + """ + Reads the access token json file, returns it as a dict + :param path: The path to the access token json file + :return: + """ + data = {} + if os.path.exists(path): + with open(path, 'r', encoding="utf-8-sig") as f: + data = json.load(f) + return data + + +def helpers_read_treatment_report_template(path: str) -> dict: + """ + Read the treatment report template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error("Configuration file does not exist: {0}".format(path)) + return {} + + +def log_func(func): + """ + Log the function and the parameters passed to it + @param func: The decorated function + @return: The wrapper function + """ + + def _wrapper(*args, **kwargs): + g_utils.logger.debug("Calling {0} args: {1} kwargs: {2}".format(func.__name__, + tuple(args), + kwargs)) + return func(*args, **kwargs) + + return _wrapper + + +def helpers_read_treatment_log_file(path: str) -> dict: + treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) + + try: + f = open(path) + except IOError as er: + self.logger.error('Opening treatment log file error: {0}'.format(' '.join(er.args))) + + counter = 0 + treatment_log_lines = f.readlines() + + # print("log_lines: {0}".format(treatment_log_lines)) + + while counter < len(treatment_log_lines): + line = treatment_log_lines[counter].strip() + print("line {0}: {1}".format(counter, line)) + print("size of log: {0}".format(len(treatment_log_lines))) + + if line.startswith(SECTION_START_CHARACTER) and line.endswith(SECTION_STOP_CHARACTER) and counter < ( + len(treatment_log_lines) - 2): + section = line + section_lines = [] + counter += 1 + section_start_counter = counter + line = treatment_log_lines[counter].strip() + while not (line.startswith(SECTION_START_CHARACTER) and line.endswith( + SECTION_STOP_CHARACTER)) and counter < len(treatment_log_lines) - 1: + section_lines.append(line) + counter += 1 + line = treatment_log_lines[counter].strip() + if len(section_lines) > 0: + if section == TREATMENT_DATA: + for data_line in section_lines: + data_components = data_line.split(',') + data_record = { + "time": int(data_components[0]) * 1000, + "bloodFlowRate": float(data_components[1]), + "dialysateFlowRate": float(data_components[2]), + "ultrafiltrationRate": float(data_components[3]), + "arterialPressure": float(data_components[4]), + "venousPressure": float(data_components[5]), + "systolic": float(data_components[6]), + "diastolic": float(data_components[7]), + "heartRate": float(data_components[8]) + } + treatment_data['data']['treatment']['data'].append(data_record) + elif section == TREATMENT_ALARMS: + for data_line in section_lines: + data_components = data_line.split(',') + parameters = [] + if len(data_components) > 2: + parameters = data_components[2:(len(data_components) - 2)] + data_record = { + "time": int(data_components[0]) * 1000, + "title": data_components[1], + "parameters": parameters + } + treatment_data['data']['treatment']['alarms'].append(data_record) + elif section == TREATMENT_EVENTS: + for data_line in section_lines: + data_components = data_line.split(',') + parameters = [] + if len(data_components) > 2: + parameters = data_components[2:(len(data_components) - 2)] + data_record = { + "time": int(data_components[0]) * 1000, + "title": data_components[1], + "parameters": parameters + } + treatment_data['data']['treatment']['events'].append(data_record) + else: + counter = section_start_counter + + else: + print('regular line') + if line.startswith(PATIENT_ID): + treatment_data['data']['patient']['id'] = line.split(',')[1] + elif line.startswith(TREATMENT_DURATION): + treatment_data['data']['treatment']['parameters']['treatmentDuration'] = int(line.split(',')[1]) + elif line.startswith(BLOOD_FLOW_RATE): + treatment_data['data']['treatment']['parameters']['bloodFlowRate'] = int(line.split(',')[1]) + elif line.startswith(DIALYSATE_FLOW_RATE): + treatment_data['data']['treatment']['parameters']['dialysateFlowRate'] = int(line.split(',')[1]) + elif line.startswith(ACID_CONCENTRATE_TYPE): + treatment_data['data']['treatment']['parameters']['dialysate']['acidCode'] = line.split(',')[1] + elif line.startswith(BICARBONATE_CONCENTRATE_TYPE): + treatment_data['data']['treatment']['parameters']['dialysate']['bicarbCode'] = line.split(',')[1] + elif line.startswith(POTASSIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['K'] = float(line.split(',')[1]) + elif line.startswith(CALCIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['Ca'] = float(line.split(',')[1]) + elif line.startswith(BICARBONATE_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['HCO3'] = float(line.split(',')[1]) + elif line.startswith(SODIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['Na'] = float(line.split(',')[1]) + elif line.startswith(DIALYSATE_TEMPERATURE): + treatment_data['data']['treatment']['parameters']['dialysateTemp'] = float(line.split(',')[1]) + elif line.startswith(DIALYZER_TYPE): + treatment_data['data']['treatment']['parameters']['dialyzerModel'] = line.split(',')[1] + elif line.startswith(HEPARIN_TYPE): + treatment_data['data']['treatment']['parameters']['heparinType'] = line.split(',')[1] + elif line.startswith(HEPARIN_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['heparinConcentration'] = float(line.split(',')[1]) + elif line.startswith(HEPARIN_BOLUS_VOLUME): + treatment_data['data']['treatment']['parameters']['heparinBolus'] = float(line.split(',')[1]) + elif line.startswith(HEPARIN_DISPENSE_RATE): + treatment_data['data']['treatment']['parameters']['heparinRate'] = float(line.split(',')[1]) + elif line.startswith(HEPARIN_STOP): + treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = int( + line.split(',')[1]) + elif line.startswith(TREATMENT_START_DATE_TIME): + element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.datetime.timestamp(element) + treatment_data['data']['treatment']['time']['start'] = int(timestamp) * 1000 + elif line.startswith(TREATMENT_END_DATE_TIME): + element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.datetime.timestamp(element) + treatment_data['data']['treatment']['time']['end'] = int(timestamp) * 1000 + elif line.startswith(ACTUAL_TREATMENT_DURATION): + treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) + elif line.startswith(DIALYSATE_VOLUME_USED): + treatment_data['data']['deviceTreatmentData']['dialysateVolumeUsed'] = float(line.split(',')[1]) + elif line.startswith(PRESCRIBED_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationVolume'] = float( + line.split(',')[1]) + elif line.startswith(TARGET_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationVolume'] = float( + line.split(',')[1]) + elif line.startswith(ACTUAL_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationVolume'] = float(line.split(',')[1]) + elif line.startswith(PRESCRIBED_UF_RATE): + treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationRate'] = float( + line.split(',')[1]) + elif line.startswith(TARGET_UF_RATE): + treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationRate'] = float( + line.split(',')[1]) + elif line.startswith(ACTUAL_UF_RATE): + treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationRate'] = float(line.split(',')[1]) + elif line.startswith(SALINE_BOLUS_VOLUME): + treatment_data['data']['deviceTreatmentData']['salineBolusVolumeGiven'] = float(line.split(',')[1]) + elif line.startswith(HEPARIN_DELIVERED_VOLUME): + treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = float(line.split(',')[1]) + elif line.startswith(WATER_SAMPLE_TEST_RESULT): + treatment_data['data']['diagnostics']['waterSampleTestResult'] = line.split(',')[1] + counter += 1 + + return treatment_data + + +def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, + success_message): + + cycle_duration = 10 + total_cycles = 3 + cycle = 0 + + while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): + sleep(cycle_duration) + cycle += 1 + + if g_utils.reachability_provider.reachability: + r = NetworkRequest(request_type=request_type, + url=url, + payload=payload, + method=method, + g_config=g_config) + request_added_to_queue = False + while not request_added_to_queue: + request_added_to_queue = network_request_handler.enqueue_request(r) + + g_utils.logger.info(success_message) + else: + g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) + + +def helpers_add_to_output_channel(output_channel, message_body, success_message): + + message_added_to_queue = False + while not message_added_to_queue: + message_added_to_queue = output_channel.enqueue_message(message_body) + + g_utils.logger.info(success_message) + Index: cloudsync/utils/reachability.py =================================================================== diff -u --- cloudsync/utils/reachability.py (revision 0) +++ cloudsync/utils/reachability.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,38 @@ +from logging import Logger +from threading import Thread +from time import sleep + +import requests + +REACHABILITY_URL = "https://google.com" +REACHABILITY_FREQ = 5 + + +class ReachabilityProvider: + + def __init__(self, logger: Logger): + self.logger = logger + self.reachability = False + self.thread = Thread(target=self.reachability_test, daemon=True) + self.thread.start() + + def reachability_test(self): + """ + Continuously monitors the connection to REACHABILITY_URL + """ + while True: + try: + response = requests.get(url=REACHABILITY_URL) + status_code = response.status_code + except requests.exceptions.RequestException as er: + status_code = 500 + + if status_code == 200: + if not self.reachability: + self.logger.info('Internet UP') + self.reachability = True + else: + if self.reachability: + self.logger.warning('Internet DOWN') + self.reachability = False + sleep(REACHABILITY_FREQ) Index: cloudsync/utils/singleton.py =================================================================== diff -u --- cloudsync/utils/singleton.py (revision 0) +++ cloudsync/utils/singleton.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,26 @@ +from threading import Lock + + +class SingletonMeta(type): + """ + Thread-safe implementation of Singleton. + """ + + _instances = {} + + _lock = Lock() + """ + Synchronizes threads during first access to the Singleton. + """ + + def __call__(cls, *args, **kwargs): + """ + Possible changes to the value of the `__init__` argument do not affect + the returned instance. + """ + # First thread acquires the lock + with cls._lock: + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] Index: cs.py =================================================================== diff -u --- cs.py (revision 0) +++ cs.py (revision 5e2872a15167f8d66d074d106f3afc78d1da3dea) @@ -0,0 +1,72 @@ +import os +import signal +import sys +import time +import logging +from subprocess import Popen + +arguments = sys.argv + +logging_level = logging.INFO + + +def get_pid(): + proc_list = list(os.popen("ps ax | grep cloud_sync.py | grep -v grep")) + if len(proc_list) > 0: + pid = proc_list[0].split()[0] + return pid + else: + return None + + +def start(): + cs_proc = get_pid() + if cs_proc: + print("CloudSync app already running") + else: + print("Starting CloudSync app with logging level {0}".format(logging_level)) + time.sleep(0.5) + Popen(['python3', 'cloud_sync.py', str(logging_level)]) + + +def stop(): + cs_proc_pid = get_pid() + if cs_proc_pid: + print("Stopping CloudSync app...") + time.sleep(0.5) + os.kill(int(cs_proc_pid), signal.SIGKILL) + else: + print("CloudSync app is not running.") + + +if len(arguments) == 2 or len(arguments) == 3: + if len(arguments) == 3: + if arguments[2] == "debug": + logging_level = logging.DEBUG + elif arguments[2] == "info": + logging_level = logging.INFO + elif arguments[2] == "warning": + logging_level = logging.WARRNING + elif arguments[2] == "error": + logging_level = logging.ERROR + else: + print("Usage: python cs.py [debug|info|warning|error]") + if arguments[1] == "start": + start() + elif str(arguments[1]) == "stop": + stop() + elif arguments[1] == "restart": + print("Restarting CloudSync app...") + time.sleep(0.5) + stop() + start() + elif arguments[1] == "status": + cs_proc = get_pid() + if cs_proc: + print("CloudSync app IS running") + else: + print("CloudSync app IS NOT running") + else: + print("Usage: python cs.py [debug|info|warning|error]") +else: + print("Usage: python cs.py [debug|info|warning|error]")