#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 远程显示 Web 服务 - Flask + WebSocket 实现 通过 RemoDispBus 协议与 DTU 装置通信,将远程 LCD 画面以 Web 页面形式展示。 前端通过 WebSocket 连接,服务端主动推送屏幕数据,实现低延迟传输。 用法: python remo_disp_server.py [装置IP] 启动后访问 http://localhost:8181 依赖: pip install flask flask-socketio pillow loguru 日志级别: 在代码中设置 LOG_LEVEL(见下方),"INFO" 不显示 DEBUG,"DEBUG" 显示调试信息。 """ # base64:将二进制 PNG 编码为字符串,便于通过 JSON/WebSocket 传输 import base64 # socket:与 DTU 装置的 TCP 通信 import socket # struct:将整数打包为二进制(如起始地址 4 字节) import struct # threading:后台线程持续拉取屏幕并推送 import threading # sys:读取命令行参数(装置 IP) import sys # os:获取当前脚本目录、路径与文件检查 import os # io:BytesIO 用于在内存中保存 PNG import io def _get_base_path() -> str: """打包成 exe 时资源在 sys._MEIPASS,否则为脚本所在目录。""" if getattr(sys, "frozen", False): return sys._MEIPASS # type: ignore[attr-defined] return os.path.dirname(os.path.abspath(__file__)) # typing:类型注解 from typing import Optional, Tuple, Set # loguru:结构化日志,记录错误与调试信息 from loguru import logger # Flask:Web 框架,提供 HTTP 路由 from flask import Flask, request, send_file # SocketIO:WebSocket 支持,实现实时双向通信 from flask_socketio import SocketIO, emit # ============================================================================= # 协议常量(RemoDispBus) # ============================================================================= # TAG_CLIENT 0xAA:客户端发送帧的帧头标识 # TAG_DEVICE 0xBB:设备回复帧的帧头标识 TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB # PORT 7003:RemoDispBus 协议默认端口 PORT = 7003 # CMD_KEEPLIVE 0:保活 # CMD_INIT 1:初始化,获取屏幕宽高、显存大小 # CMD_KEY 2:按键 # CMD_LCDMEM 3:读取显存(屏幕画面) CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3 # ============================================================================= # 日志级别(在代码中直接修改) # ============================================================================= # "INFO":仅输出 INFO/WARNING/ERROR,不显示 DEBUG # "DEBUG":输出包括 DEBUG 在内的全部日志 LOG_LEVEL = "INFO" # ============================================================================= # 全局状态 # ============================================================================= # _sock:与 DTU 装置的 TCP socket,None 表示未连接 _sock: Optional[socket.socket] = None # _screen_clients:当前需要接收屏幕推送的 WebSocket 客户端 sid 集合 _screen_clients: Set[str] = set() # _refresh_thread:后台刷新线程,持续拉取屏幕并推送 _refresh_thread: Optional[threading.Thread] = None # _refresh_stop:Event,set 时刷新线程退出 _refresh_stop = threading.Event() # ============================================================================= # 协议帧编解码 # ============================================================================= def calc_crc(data: bytes) -> int: """ 对数据区逐字节异或,取低 8 位作为 CRC 校验码。 协议规定:CRC = data[0] ^ data[1] ^ ... ^ data[n-1],取低 8 位。 """ crc = 0 # 初始值为 0 for b in data: crc ^= b # 逐字节异或 return crc & 0xFF # 取低 8 位作为最终 CRC def build_frame(cmd: int, data: bytes) -> bytes: """ 构造一帧发送数据,格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data][crc(1)] length 大端 2 字节;crc 对 data 计算。 """ length = len(data) # 数据区长度 # 构造帧头:TAG_CLIENT + cmd + 长度高字节 + 长度低字节(大端序) header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) return header + data + bytes([calc_crc(data)]) # 帧头 + 数据 + CRC def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: """ 解析设备回复帧,返回 (cmd, data) 或 None。 帧格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data(length)][crc(1)] 最短合法帧长度为 5 字节(长度为 0 时: 1+1+2+0+1)。 """ if len(raw) < 5 or raw[0] != TAG_DEVICE: # 至少 5 字节且帧头为 TAG_DEVICE logger.error( "[parse_frame] 帧太短或帧头不是 TAG_DEVICE(0xBB), len={len_}", len_=len(raw), ) return None length = (raw[2] << 8) | raw[3] # 大端序解析数据区长度 if len(raw) < 4 + length + 1: # 检查是否收齐整帧(头4字节+数据+CRC1字节) logger.error( "[parse_frame] 数据不完整, 需要 {need} 字节, 实际 {actual} 字节", need=4 + length + 1, actual=len(raw), ) return None data = raw[4:4 + length] # 提取数据区 if calc_crc(data) != raw[4 + length]: # CRC 校验失败 logger.error("[parse_frame] CRC 校验失败") return None cmd = raw[1] logger.debug("[parse_frame] 成功解析帧 cmd={cmd:#x}, length={length}", cmd=cmd, length=length) return (cmd, data) # 返回 (命令码, 数据) # ============================================================================= # 连接与收发 # ============================================================================= def connect(host: str, port: int = PORT) -> bool: """ 建立与 DTU 装置的 TCP 连接。 若有旧连接则先关闭;连接时超时 3 秒;连接成功后设为 2 秒。 """ global _sock try: if _sock: _sock.close() # 关闭旧连接 logger.info("[connect] 已关闭旧连接") _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 TCP socket _sock.settimeout(3.0) # 连接阶段超时 3 秒 _sock.connect((host, port)) # 连接目标主机和端口 _sock.settimeout(2.0) # 连接成功后收发超时 2 秒 logger.info("[connect] 连接 DTU 成功: {host}:{port}", host=host, port=port) return True except Exception as e: logger.exception("[connect] 连接失败: {host}:{port} - {err}", host=host, port=port, err=e) return False def _send(cmd: int, data: bytes = b"") -> bool: """发送一帧到设备""" if not _sock: logger.error("[_send] 未连接设备,无法发送 cmd={cmd:#x}", cmd=cmd) return False try: frame = build_frame(cmd, data) _sock.sendall(frame) # 构造帧并一次性发送 logger.debug("[_send] 已发送 cmd={cmd:#x}, data_len={length}", cmd=cmd, length=len(data)) return True except Exception as e: logger.exception("[_send] 发送失败 cmd={cmd:#x} - {err}", cmd=cmd, err=e) return False def _recv() -> Optional[Tuple[int, bytes]]: """ 从设备接收一帧。若首次接收长度不足,会再尝试接收最多 2 次(共 3 次), 拼齐整帧后再解析。单次 recv 最多 256KB。 """ if not _sock: logger.error("[_recv] 未连接设备,无法接收数据") return None try: buffer = b"" max_attempts = 3 # 首次 + 最多再收 2 次 for attempt in range(max_attempts): chunk = _sock.recv(256 * 1024) if not chunk: logger.debug("[_recv] 第 {n} 次 recv 收到空数据,连接可能已关闭", n=attempt + 1) break buffer += chunk if len(buffer) < 5: # 至少需要 5 字节才能解析头 logger.debug("[_recv] 第 {n} 次 recv 后共 {len_} 字节,继续接收", n=attempt + 1, len_=len(buffer)) continue data_len = (buffer[2] << 8) | buffer[3] # 数据区长度 need = 4 + data_len + 1 # 整帧长度 if len(buffer) >= need: logger.debug("[_recv] 第 {n} 次 recv 后收齐整帧 len={len_}", n=attempt + 1, len_=need) return parse_frame(buffer[:need]) logger.debug( "[_recv] 第 {n} 次 recv 后共 {have} 字节,需要 {need} 字节,继续接收", n=attempt + 1, have=len(buffer), need=need, ) if len(buffer) < 5: logger.error("[_recv] 接收数据太短, len={len_}", len_=len(buffer)) return None need = 4 + ((buffer[2] << 8) | buffer[3]) + 1 logger.error( "[_recv] 尝试 {max_attempts} 次后数据仍不完整, 需要 {need} 字节, 实际 {actual} 字节", max_attempts=max_attempts, need=need, actual=len(buffer), ) return None except Exception as e: logger.exception("[_recv] 接收异常 - {err}", err=e) return None def send_key(code: int) -> bool: """发送按键码到设备""" return _send(CMD_KEY, bytes([code & 0xFF])) # 按键码单字节 def fetch_screen() -> Optional[bytes]: """ 向设备请求当前屏幕显存,返回 1bpp 位图数据。 发送 CMD_LCDMEM + 起始地址 0;设备回复 payload 前 4 字节为地址,后续为位图,返回 payload[4:]。 """ if not _sock: logger.error("[fetch_screen] 未连接设备,无法读取屏幕") return None if not _send(CMD_LCDMEM, struct.pack(">I", 0)): # 发送读取显存命令,起始地址 0(大端 4 字节) logger.error("[fetch_screen] 发送 CMD_LCDMEM 失败") return None logger.debug("[fetch_screen] 发送 CMD_LCDMEM 成功") result = _recv() # 接收设备回复 if not result: logger.error("[fetch_screen] 未收到任何回复(result=None)") return None cmd, payload = result if cmd != CMD_LCDMEM: # 可能是按键 ACK、保活等其他短帧,按键操作时较常见,这里只做调试日志,不视为错误 logger.debug("[fetch_screen] 收到非 LCDMEM 帧 cmd={cmd:#x},忽略", cmd=cmd) return None # 确认为 LCDMEM 回复 if len(payload) >= 4: # 前 4 字节为地址,后面是位图 logger.debug("[fetch_screen] 收到 LCDMEM,payload_len={len_}", len_=len(payload)) return payload[4:] logger.debug("[fetch_screen] 收到 LCDMEM,payload_len={len_} (<4)", len_=len(payload)) return payload or None def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]: """ 将 1bpp 位图转为 PNG 字节流。 每字节 8 像素,高位在前;bi=(y*w+x)//8 为字节索引,bit=7-(x%8) 为位索引;1 为白 255,0 为黑 0。 """ try: from PIL import Image logger.debug( "[mono_to_png] 开始转换为 PNG,width={width}, height={height}, data_len={len_}", width=width, height=height, len_=len(data), ) img = Image.new("1", (width, height)) # 创建 1bpp 黑白图像 pix = img.load() # 获取像素访问对象 ''' for y in range(height): for x in range(width): bi = (y * width + x) // 8 # 字节索引:每 8 像素一字节 bit = 7 - (x % 8) # 位索引:高位在前 pix[x, y] = 0 if (bi < len(data) and (data[bi] >> bit) & 1) else 255 # 1→白 0→黑 ''' for y in range(height): for x in range(width): pix[x, y] = data[y * width + x] buf = io.BytesIO() # 内存缓冲区 img.save(buf, format="PNG") # 保存为 PNG 格式 png_bytes = buf.getvalue() logger.debug("[mono_to_png] 转换 PNG 成功, png_len={len_}", len_=len(png_bytes)) return png_bytes # 返回 PNG 字节流 except ImportError: logger.error("[mono_to_png] 缺少 PIL/Pillow, 请执行 pip install pillow") return None except Exception as e: logger.exception("[mono_to_png] 转换 PNG 失败 - {err}", err=e) return None def disconnect_device(): """关闭与设备的连接""" global _sock try: if _sock: logger.info("[disconnect_device] 正在关闭与 DTU 的连接") _sock.close() # 关闭 socket _sock = None # 清空引用 except Exception as e: logger.exception("[disconnect_device] 关闭连接时异常 - {err}", err=e) # ============================================================================= # 屏幕推送线程 # ============================================================================= def _screen_refresh_loop(): """ 后台线程:每约 100ms 拉取一帧屏幕,转 PNG 后 base64 推送给各 WebSocket 客户端。 若检测到远程服务端断开(连续多次拉取失败),则断开设备并停止重复请求。 _refresh_stop.wait(0.1) 既作间隔,也便于收到 set 时快速退出。 """ global _screen_clients, _refresh_stop logger.info("[_screen_refresh_loop] 刷新线程启动") # 连续拉取失败次数,超过阈值则认为远程已断开 MAX_CONSECUTIVE_FAILURES = 5 consecutive_failures = 0 try: while not _refresh_stop.is_set(): # 未被要求停止时持续循环 if _screen_clients and _sock: # 有客户端且已连接设备 data = fetch_screen() # 拉取屏幕位图 logger.debug("[fetch_screen] 拉取屏幕位图数据: {data}", data=data) if data: consecutive_failures = 0 # 成功则清零 # 当前协议未返回宽高,这里固定为 160x160,可根据实际情况调整 w, h = 160, 160 png = mono_to_png(data, w, h) # 转为 PNG if png: b64 = base64.b64encode(png).decode("ascii") # base64 编码便于 JSON 传输 for sid in list(_screen_clients): # 复制列表避免迭代时修改 try: socketio.emit( "screen", {"png": b64, "w": w, "h": h}, room=sid, ) # 推送给该客户端 except Exception as e: logger.exception( "[_screen_refresh_loop] 推送 screen 给 {sid} 失败 - {err}", sid=sid, err=e, ) else: # 拉取失败,累计连续失败次数 consecutive_failures += 1 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: logger.warning( "[_screen_refresh_loop] 连续 {n} 次拉取失败,判定远程服务端已断开,停止请求并断开连接", n=consecutive_failures, ) disconnect_device() # 通知所有订阅客户端:设备已断开 for sid in list(_screen_clients): try: socketio.emit("device_disconnected", {"reason": "remote_closed"}, room=sid) except Exception as e: logger.debug("[_screen_refresh_loop] 推送 device_disconnected 失败 sid={sid} - {err}", sid=sid, err=e) consecutive_failures = 0 else: consecutive_failures = 0 # 无客户端或未连接时清零,便于下次连接后重新计数 _refresh_stop.wait(timeout=0.1) # 等待 100ms,若被 set 则立即返回 finally: logger.info("[_screen_refresh_loop] 刷新线程退出") # ============================================================================= # Flask + SocketIO # ============================================================================= app = Flask(__name__) # 创建 Flask 应用 app.config["SECRET_KEY"] = "remo_disp" # 会话密钥 socketio = SocketIO(app, cors_allowed_origins="*") # 创建 SocketIO,允许跨域 FAVICON_PATH = os.path.join(_get_base_path(), "static", "favicon.ico") # favicon 文件路径 @app.route("/favicon.ico") def favicon(): """提供网站图标""" if os.path.exists(FAVICON_PATH): return send_file(FAVICON_PATH, mimetype="image/x-icon") return "", 404 # 不存在则返回 404 @app.route("/") @app.route("/index.html") def index(): """主页,返回前端 HTML""" return send_file( os.path.join(_get_base_path(), "remo_disp_ui.html"), mimetype="text/html; charset=utf-8", ) @socketio.on("connect") def on_connect(): """WebSocket 客户端连接时触发(可留空)""" logger.info("[on_connect] WebSocket 客户端已连接 sid={sid}", sid=request.sid) @socketio.on("disconnect") def on_disconnect(): """WebSocket 客户端断开(关标签页等)时,从 _screen_clients 移除,若无客户端则断开设备""" global _screen_clients, _refresh_thread, _refresh_stop sid = request.sid # 获取断开客户端的 session id logger.info("[on_disconnect] WebSocket 客户端断开 sid={sid}", sid=sid) if sid in _screen_clients: _screen_clients.discard(sid) # 从订阅列表移除 if not _screen_clients: # 若无其他客户端 _refresh_stop.set() # 通知刷新线程停止 disconnect_device() # 断开设备连接 @socketio.on("connect_device") def on_connect_device(data): """前端请求连接设备:连接成功后加入 _screen_clients,启动刷新线程,回复 connect_result""" global _screen_clients, _refresh_thread, _refresh_stop host = (data.get("host") or "").strip() # 从 data 取 host,去空格 port = int(data.get("port") or PORT) # 从 data 取 port,缺省 7003 logger.info("[on_connect_device] 请求连接 DTU: {host}:{port}", host=host, port=port) ok = connect(host, port) # 建立 TCP 连接 if ok: _screen_clients.add(request.sid) # 将当前客户端加入屏幕订阅 _refresh_stop.clear() # 清除停止标志,允许刷新线程运行 if _refresh_thread is None or not _refresh_thread.is_alive(): # 刷新线程未运行 _refresh_thread = threading.Thread(target=_screen_refresh_loop, daemon=True) # 创建后台线程 _refresh_thread.start() # 启动线程 logger.info("[on_connect_device] 已启动屏幕刷新线程") emit("connect_result", {"success": True}) # 回复连接成功 logger.info("[on_connect_device] 连接 DTU 成功,已加入订阅 sid={sid}", sid=request.sid) else: logger.error("[on_connect_device] 连接 DTU 失败: {host}:{port}", host=host, port=port) emit("connect_result", {"success": False, "error": "connect failed"}) # 回复连接失败 @socketio.on("disconnect_device") def on_disconnect_device(): """前端请求断开设备:从 _screen_clients 移除,若无客户端则断开设备,回复 disconnect_result""" global _screen_clients, _refresh_stop sid = request.sid logger.info("[on_disconnect_device] 前端请求断开设备 sid={sid}", sid=sid) _screen_clients.discard(sid) # 从订阅列表移除 if not _screen_clients: # 若无其他客户端 _refresh_stop.set() # 通知刷新线程停止 disconnect_device() # 断开设备连接 emit("disconnect_result", {"ok": True}) # 回复断开成功 logger.info("[on_disconnect_device] 设备已断开(若为最后一个客户端)") @socketio.on("key") def on_key(data): """前端发送按键:从 data 取 code,转发给设备""" code = int(data.get("code", 0)) # 按键码,缺省 0 logger.debug("[on_key] 收到按键 code={code:#x}", code=code) send_key(code) # 发送给 DTU 设备 # ============================================================================= # 启动 # ============================================================================= def _configure_log_level(): """根据全局 LOG_LEVEL 配置日志级别(见文件顶部 LOG_LEVEL 常量)。""" level = LOG_LEVEL.upper() if isinstance(LOG_LEVEL, str) else "INFO" if level not in ("TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"): level = "INFO" logger.remove() # 移除默认的 stderr 输出 logger.add(sys.stderr, level=level, format="{time:HH:mm:ss} | {level: <8} | {message}") logger.add( "remo_disp.log", rotation="1 week", encoding="utf-8", enqueue=True, backtrace=True, diagnose=False, level=level, ) return level @logger.catch def main(): """主入口:启动 Web 服务""" port = 8181 # HTTP 服务端口 log_level = _configure_log_level() logger.info("日志级别: {level}", level=log_level) logger.info("远程显示服务: http://localhost:{port}", port=port) logger.info("使用 WebSocket 传输,在浏览器中打开上述地址") if len(sys.argv) >= 2: host = sys.argv[1] logger.info("[main] 尝试预连接 DTU: {host}", host=host) if connect(host): # 若命令行有 IP,预连接 logger.info("[main] 已预连接: {host}", host=host) else: logger.error("[main] 预连接失败: {host}", host=host) socketio.run(app, host="0.0.0.0", port=port, debug=True, allow_unsafe_werkzeug=True) # 监听所有网卡 if __name__ == "__main__": main()