diff --git a/remo_disp_server.py b/remo_disp_server.py index ae56f06..bb79dbb 100644 --- a/remo_disp_server.py +++ b/remo_disp_server.py @@ -1,215 +1,341 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -远程显示 Web 服务 - Flask 实现 +远程显示 Web 服务 - Flask + WebSocket 实现 + +通过 RemoDispBus 协议与 DTU 装置通信,将远程 LCD 画面以 Web 页面形式展示。 +前端通过 WebSocket 连接,服务端主动推送屏幕数据,实现低延迟传输。 用法: python remo_disp_server.py [装置IP] 启动后访问 http://localhost:8181 -依赖: pip install flask +依赖: pip install flask flask-socketio pillow """ +# base64:将二进制 PNG 编码为字符串,便于通过 JSON/WebSocket 传输 +import base64 +# socket:与 DTU 装置的 TCP 通信 import socket +# struct:将整数打包为二进制(如起始地址 4 字节) import struct -import time +# threading:后台线程持续拉取屏幕并推送 +import threading +# sys:读取命令行参数(装置 IP) import sys +# os:获取当前脚本目录,拼接 favicon 路径 import os +# io:BytesIO 用于在内存中保存 PNG import io -from typing import Optional, Tuple +# typing:类型注解 +from typing import Optional, Tuple, Set -from flask import Flask, request, jsonify, send_file, Response +# Flask:Web 框架,提供 HTTP 路由 +from flask import Flask, request, send_file +# SocketIO:WebSocket 支持,实现实时双向通信 +from flask_socketio import SocketIO, emit -# 协议常量 +# ============================================================================= +# 协议常量(RemoDispBus) +# ============================================================================= +# TAG_CLIENT 0xAA:客户端发送帧的帧头标识 +# TAG_DEVICE 0xBB:设备回复帧的帧头标识 TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB +# PORT 7003:RemoDispBus 协议默认端口 PORT = 7003 +# CMD_KEEPLIVE 0:保活 +# CMD_INIT 1:初始化,获取屏幕宽高、显存大小 +# CMD_KEY 2:按键 +# CMD_LCDMEM 3:读取显存(屏幕画面) CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3 -# 全局连接 +# ============================================================================= +# 全局状态 +# ============================================================================= +# _sock:与 DTU 装置的 TCP socket,None 表示未连接 _sock: Optional[socket.socket] = None +# _init_info:初始化信息(width/height/mem_size),由 CMD_INIT 获取 _init_info: Optional[dict] = None +# _screen_clients:当前需要接收屏幕推送的 WebSocket 客户端 sid 集合 +_screen_clients: Set[str] = set() +# _refresh_thread:后台刷新线程,持续拉取屏幕并推送 +_refresh_thread: Optional[threading.Thread] = None +# _refresh_stop:Event,set 时刷新线程退出 +_refresh_stop = threading.Event() +# ============================================================================= +# 协议帧编解码 +# ============================================================================= + def calc_crc(data: bytes) -> int: - crc = 0 + """ + 对数据区逐字节异或,取低 8 位作为 CRC 校验码。 + 协议规定:CRC = data[0] ^ data[1] ^ ... ^ data[n-1],取低 8 位。 + """ + crc = 0 # 初始值为 0 for b in data: - crc ^= b - return crc & 0xFF + crc ^= b # 逐字节异或 + return crc & 0xFF # 取低 8 位作为最终 CRC def build_frame(cmd: int, data: bytes) -> bytes: - length = len(data) + """ + 构造一帧发送数据,格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data][crc(1)] + length 大端 2 字节;crc 对 data 计算。 + """ + length = len(data) # 数据区长度 + # 构造帧头:TAG_CLIENT + cmd + 长度高字节 + 长度低字节(大端序) header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) - return header + data + bytes([calc_crc(data)]) + return header + data + bytes([calc_crc(data)]) # 帧头 + 数据 + CRC def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: - if len(raw) < 6 or raw[0] != TAG_DEVICE: + """ + 解析设备回复帧,返回 (cmd, data) 或 None。 + 帧格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data(length)][crc(1)] + """ + if len(raw) < 6 or raw[0] != TAG_DEVICE: # 至少 6 字节且帧头为 TAG_DEVICE return None - length = (raw[2] << 8) | raw[3] - if len(raw) < 5 + length + 1: + length = (raw[2] << 8) | raw[3] # 大端序解析数据区长度 + if len(raw) < 4 + length + 1: # 检查是否收齐整帧(头4字节+数据+CRC1字节) return None - data = raw[4:4 + length] - if calc_crc(data) != raw[4 + length]: + data = raw[4:4 + length] # 提取数据区 + if calc_crc(data) != raw[4 + length]: # CRC 校验失败 return None - return (raw[1], data) + return (raw[1], data) # 返回 (命令码, 数据) +# ============================================================================= +# 连接与收发 +# ============================================================================= + def connect(host: str, port: int = PORT) -> bool: + """ + 建立与 DTU 装置的 TCP 连接。 + 若有旧连接则先关闭;连接时超时 3 秒;连接成功后设为 2 秒。 + """ global _sock, _init_info try: if _sock: - _sock.close() - _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - _sock.settimeout(5.0) - _sock.connect((host, port)) - _sock.settimeout(2.0) - _init_info = _request_init() + _sock.close() # 关闭旧连接 + _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 TCP socket + _sock.settimeout(3.0) # 连接阶段超时 3 秒 + _sock.connect((host, port)) # 连接目标主机和端口 + _sock.settimeout(2.0) # 连接成功后收发超时 2 秒 return True except Exception: return False def _send(cmd: int, data: bytes = b"") -> bool: + """发送一帧到设备""" if not _sock: return False try: - _sock.sendall(build_frame(cmd, data)) + _sock.sendall(build_frame(cmd, data)) # 构造帧并一次性发送 return True except Exception: return False def _recv() -> Optional[Tuple[int, bytes]]: + """ + 从设备接收一帧,一次性 recv 最多 256KB。 + 至少需 5 字节(4 字节头+1 字节数据或 CRC);根据头中 length 检查是否收齐整帧。 + """ if not _sock: return None try: - header = _sock.recv(5) - if len(header) < 5: + data = _sock.recv(256 * 1024) # 单次接收最多 256KB + if len(data) < 5: # 至少需要 5 字节(4 头 + 1 数据/CRC) return None - length = (header[2] << 8) | header[3] - rest = _sock.recv(length + 1) - if len(rest) < length + 1: + length = (data[2] << 8) | data[3] # 解析数据区长度 + if len(data) < 4 + length + 1: # 数据不完整 return None - return parse_frame(header + rest) + return parse_frame(data[:4 + length + 1]) # 解析并返回 (cmd, data) except Exception: return None -def _request_init() -> Optional[dict]: - if not _send(CMD_INIT): - return None - for _ in range(30): - result = _recv() - if result and result[0] == CMD_INIT: - data = result[1] - if len(data) >= 29: - return { - "width": (data[0] << 8) | data[1], - "height": (data[2] << 8) | data[3], - "mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8], - } - time.sleep(0.05) - return None - - def send_key(code: int) -> bool: - return _send(CMD_KEY, bytes([code & 0xFF])) + """发送按键码到设备""" + return _send(CMD_KEY, bytes([code & 0xFF])) # 按键码单字节 def fetch_screen() -> Optional[bytes]: - global _init_info - info = _init_info or _request_init() - if not info: + """ + 向设备请求当前屏幕显存,返回 1bpp 位图数据。 + 发送 CMD_LCDMEM + 起始地址 0;设备回复 payload 前 4 字节为地址,后续为位图,返回 payload[4:]。 + """ + if not _sock: return None - mem_size = info["mem_size"] - buffer = bytearray(mem_size) - pos = 0 - while pos < mem_size: - if not _send(CMD_LCDMEM, struct.pack(">I", pos)): - return None - for _ in range(40): - result = _recv() - if result and result[0] == CMD_LCDMEM: - payload = result[1] - if len(payload) >= 4: - start = struct.unpack(">I", payload[:4])[0] - chunk = payload[4:] - buffer[start : start + len(chunk)] = chunk - pos = start + len(chunk) - break - time.sleep(0.02) - else: - return None - return bytes(buffer) + if not _send(CMD_LCDMEM, struct.pack(">I", 0)): # 发送读取显存命令,起始地址 0(大端 4 字节) + return None + result = _recv() # 接收设备回复 + if result and result[0] == CMD_LCDMEM: # 确认为 LCDMEM 回复 + payload = result[1] # 数据区 + if len(payload) >= 4: # 前 4 字节为地址,后面是位图 + return payload[4:] + return payload or None + return None def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]: + """ + 将 1bpp 位图转为 PNG 字节流。 + 每字节 8 像素,高位在前;bi=(y*w+x)//8 为字节索引,bit=7-(x%8) 为位索引;1 为白 255,0 为黑 0。 + """ try: from PIL import Image - img = Image.new("1", (width, height)) - pix = img.load() + img = Image.new("1", (width, height)) # 创建 1bpp 黑白图像 + pix = img.load() # 获取像素访问对象 for y in range(height): for x in range(width): - bi = (y * width + x) // 8 - bit = 7 - (x % 8) - pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 - buf = io.BytesIO() - img.save(buf, format="PNG") - return buf.getvalue() + bi = (y * width + x) // 8 # 字节索引:每 8 像素一字节 + bit = 7 - (x % 8) # 位索引:高位在前 + pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 # 1→白 0→黑 + buf = io.BytesIO() # 内存缓冲区 + img.save(buf, format="PNG") # 保存为 PNG 格式 + return buf.getvalue() # 返回 PNG 字节流 except ImportError: return None -app = Flask(__name__) +def disconnect_device(): + """关闭与设备的连接""" + global _sock, _init_info + try: + if _sock: + _sock.close() # 关闭 socket + _sock = None # 清空引用 + _init_info = None # 清空初始化信息 + except Exception: + pass + + +# ============================================================================= +# 屏幕推送线程 +# ============================================================================= + +def _screen_refresh_loop(): + """ + 后台线程:每约 100ms 拉取一帧屏幕,转 PNG 后 base64 推送给各 WebSocket 客户端。 + _refresh_stop.wait(0.1) 既作间隔,也便于收到 set 时快速退出。 + """ + global _screen_clients, _refresh_stop + while not _refresh_stop.is_set(): # 未被要求停止时持续循环 + if _screen_clients and _sock: # 有客户端且已连接设备 + data = fetch_screen() # 拉取屏幕位图 + if data: + info = _init_info or {} # 获取宽高,缺省 160x160 + w, h = info.get("width", 160), info.get("height", 160) + png = mono_to_png(data, w, h) # 转为 PNG + if png: + b64 = base64.b64encode(png).decode("ascii") # base64 编码便于 JSON 传输 + for sid in list(_screen_clients): # 复制列表避免迭代时修改 + try: + socketio.emit("screen", {"png": b64, "w": w, "h": h}, room=sid) # 推送给该客户端 + except Exception: + pass + _refresh_stop.wait(timeout=0.1) # 等待 100ms,若被 set 则立即返回 + + +# ============================================================================= +# Flask + SocketIO +# ============================================================================= + +app = Flask(__name__) # 创建 Flask 应用 +app.config["SECRET_KEY"] = "remo_disp" # 会话密钥 +socketio = SocketIO(app, cors_allowed_origins="*") # 创建 SocketIO,允许跨域 + +FAVICON_PATH = os.path.join(os.path.dirname(__file__), "static", "favicon.ico") # favicon 文件路径 + + +@app.route("/favicon.ico") +def favicon(): + """提供网站图标""" + if os.path.exists(FAVICON_PATH): + return send_file(FAVICON_PATH, mimetype="image/x-icon") + return "", 404 # 不存在则返回 404 @app.route("/") @app.route("/index.html") def index(): + """主页,返回前端 HTML""" return send_file( os.path.join(os.path.dirname(__file__), "remo_disp_ui.html"), mimetype="text/html; charset=utf-8", ) -@app.route("/connect") -def api_connect(): - host = request.args.get("host", "").strip() - port = int(request.args.get("port", PORT)) - ok = connect(host, port) - return jsonify(success=ok, error=None if ok else "connect failed") +@socketio.on("connect") +def on_connect(): + """WebSocket 客户端连接时触发(可留空)""" + pass -@app.route("/key") -def api_key(): - code = int(request.args.get("code", 0)) - send_key(code) - return jsonify(ok=True) +@socketio.on("disconnect") +def on_disconnect(): + """WebSocket 客户端断开(关标签页等)时,从 _screen_clients 移除,若无客户端则断开设备""" + global _screen_clients, _refresh_thread, _refresh_stop + sid = request.sid # 获取断开客户端的 session id + if sid in _screen_clients: + _screen_clients.discard(sid) # 从订阅列表移除 + if not _screen_clients: # 若无其他客户端 + _refresh_stop.set() # 通知刷新线程停止 + disconnect_device() # 断开设备连接 -@app.route("/screen") -def api_screen(): - data = fetch_screen() - if not data: - return "", 500 - info = _init_info or {} - w, h = info.get("width", 160), info.get("height", 160) - png = mono_to_png(data, w, h) - if png: - return Response(png, mimetype="image/png") - resp = Response(data, mimetype="application/octet-stream") - resp.headers["X-Width"] = str(w) - resp.headers["X-Height"] = str(h) - return resp +@socketio.on("connect_device") +def on_connect_device(data): + """前端请求连接设备:连接成功后加入 _screen_clients,启动刷新线程,回复 connect_result""" + global _screen_clients, _refresh_thread, _refresh_stop + host = (data.get("host") or "").strip() # 从 data 取 host,去空格 + port = int(data.get("port") or PORT) # 从 data 取 port,缺省 7003 + ok = connect(host, port) # 建立 TCP 连接 + if ok: + _screen_clients.add(request.sid) # 将当前客户端加入屏幕订阅 + _refresh_stop.clear() # 清除停止标志,允许刷新线程运行 + if _refresh_thread is None or not _refresh_thread.is_alive(): # 刷新线程未运行 + _refresh_thread = threading.Thread(target=_screen_refresh_loop, daemon=True) # 创建后台线程 + _refresh_thread.start() # 启动线程 + emit("connect_result", {"success": True}) # 回复连接成功 + else: + emit("connect_result", {"success": False, "error": "connect failed"}) # 回复连接失败 +@socketio.on("disconnect_device") +def on_disconnect_device(): + """前端请求断开设备:从 _screen_clients 移除,若无客户端则断开设备,回复 disconnect_result""" + global _screen_clients, _refresh_stop + sid = request.sid + _screen_clients.discard(sid) # 从订阅列表移除 + if not _screen_clients: # 若无其他客户端 + _refresh_stop.set() # 通知刷新线程停止 + disconnect_device() # 断开设备连接 + emit("disconnect_result", {"ok": True}) # 回复断开成功 + + +@socketio.on("key") +def on_key(data): + """前端发送按键:从 data 取 code,转发给设备""" + code = int(data.get("code", 0)) # 按键码,缺省 0 + send_key(code) # 发送给 DTU 设备 + + +# ============================================================================= +# 启动 +# ============================================================================= + def main(): - port = 8181 + """主入口:启动 Web 服务""" + port = 8181 # HTTP 服务端口 print(f"远程显示服务: http://localhost:{port}") - print("在浏览器中打开上述地址,点击「连接」输入装置 IP") + print("使用 WebSocket 传输,在浏览器中打开上述地址") if len(sys.argv) >= 2: - connect(sys.argv[1]) + connect(sys.argv[1]) # 若命令行有 IP,预连接 print(f"已预连接: {sys.argv[1]}") - app.run(host="0.0.0.0", port=port, debug=False, threaded=True) + socketio.run(app, host="0.0.0.0", port=port, debug=True, allow_unsafe_werkzeug=True) # 监听所有网卡 if __name__ == "__main__": diff --git a/remo_disp_ui.html b/remo_disp_ui.html index 6f1a8d0..a73f513 100644 --- a/remo_disp_ui.html +++ b/remo_disp_ui.html @@ -1,51 +1,83 @@ + +
+ + +