Files
DTU-RemtoeLCD/remo_disp_server.py
Wanderingss f7898adc5d 1. 修改了,远程服务端断开就卡死的bug,
2. 修改图像解析逻辑,使其支持DTU-HMI 的图像格式,不再支持原来的图像格式,不能显示原来设备发来的图像
2026-03-12 18:06:07 +08:00

505 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示 Web 服务 - Flask + WebSocket 实现
通过 RemoDispBus 协议与 DTU 装置通信,将远程 LCD 画面以 Web 页面形式展示。
前端通过 WebSocket 连接,服务端主动推送屏幕数据,实现低延迟传输。
用法: python remo_disp_server.py [装置IP]
启动后访问 http://localhost:8181
依赖: pip install flask flask-socketio pillow loguru
日志级别: 在代码中设置 LOG_LEVEL见下方"INFO" 不显示 DEBUG"DEBUG" 显示调试信息。
"""
# base64将二进制 PNG 编码为字符串,便于通过 JSON/WebSocket 传输
import base64
# socket与 DTU 装置的 TCP 通信
import socket
# struct将整数打包为二进制如起始地址 4 字节)
import struct
# threading后台线程持续拉取屏幕并推送
import threading
# sys读取命令行参数装置 IP
import sys
# os获取当前脚本目录、路径与文件检查
import os
# ioBytesIO 用于在内存中保存 PNG
import io
def _get_base_path() -> str:
"""打包成 exe 时资源在 sys._MEIPASS否则为脚本所在目录。"""
if getattr(sys, "frozen", False):
return sys._MEIPASS # type: ignore[attr-defined]
return os.path.dirname(os.path.abspath(__file__))
# typing类型注解
from typing import Optional, Tuple, Set
# loguru结构化日志记录错误与调试信息
from loguru import logger
# FlaskWeb 框架,提供 HTTP 路由
from flask import Flask, request, send_file
# SocketIOWebSocket 支持,实现实时双向通信
from flask_socketio import SocketIO, emit
# =============================================================================
# 协议常量RemoDispBus
# =============================================================================
# TAG_CLIENT 0xAA客户端发送帧的帧头标识
# TAG_DEVICE 0xBB设备回复帧的帧头标识
TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB
# PORT 7003RemoDispBus 协议默认端口
PORT = 7003
# CMD_KEEPLIVE 0保活
# CMD_INIT 1初始化获取屏幕宽高、显存大小
# CMD_KEY 2按键
# CMD_LCDMEM 3读取显存屏幕画面
CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3
# =============================================================================
# 日志级别(在代码中直接修改)
# =============================================================================
# "INFO":仅输出 INFO/WARNING/ERROR不显示 DEBUG
# "DEBUG":输出包括 DEBUG 在内的全部日志
LOG_LEVEL = "INFO"
# =============================================================================
# 全局状态
# =============================================================================
# _sock与 DTU 装置的 TCP socketNone 表示未连接
_sock: Optional[socket.socket] = None
# _screen_clients当前需要接收屏幕推送的 WebSocket 客户端 sid 集合
_screen_clients: Set[str] = set()
# _refresh_thread后台刷新线程持续拉取屏幕并推送
_refresh_thread: Optional[threading.Thread] = None
# _refresh_stopEventset 时刷新线程退出
_refresh_stop = threading.Event()
# =============================================================================
# 协议帧编解码
# =============================================================================
def calc_crc(data: bytes) -> int:
"""
对数据区逐字节异或,取低 8 位作为 CRC 校验码。
协议规定CRC = data[0] ^ data[1] ^ ... ^ data[n-1],取低 8 位。
"""
crc = 0 # 初始值为 0
for b in data:
crc ^= b # 逐字节异或
return crc & 0xFF # 取低 8 位作为最终 CRC
def build_frame(cmd: int, data: bytes) -> bytes:
"""
构造一帧发送数据,格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data][crc(1)]
length 大端 2 字节crc 对 data 计算。
"""
length = len(data) # 数据区长度
# 构造帧头TAG_CLIENT + cmd + 长度高字节 + 长度低字节(大端序)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)]) # 帧头 + 数据 + CRC
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""
解析设备回复帧,返回 (cmd, data) 或 None。
帧格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data(length)][crc(1)]
最短合法帧长度为 5 字节(长度为 0 时: 1+1+2+0+1
"""
if len(raw) < 5 or raw[0] != TAG_DEVICE: # 至少 5 字节且帧头为 TAG_DEVICE
logger.error(
"[parse_frame] 帧太短或帧头不是 TAG_DEVICE(0xBB), len={len_}",
len_=len(raw),
)
return None
length = (raw[2] << 8) | raw[3] # 大端序解析数据区长度
if len(raw) < 4 + length + 1: # 检查是否收齐整帧头4字节+数据+CRC1字节
logger.error(
"[parse_frame] 数据不完整, 需要 {need} 字节, 实际 {actual} 字节",
need=4 + length + 1,
actual=len(raw),
)
return None
data = raw[4:4 + length] # 提取数据区
if calc_crc(data) != raw[4 + length]: # CRC 校验失败
logger.error("[parse_frame] CRC 校验失败")
return None
cmd = raw[1]
logger.debug("[parse_frame] 成功解析帧 cmd={cmd:#x}, length={length}", cmd=cmd, length=length)
return (cmd, data) # 返回 (命令码, 数据)
# =============================================================================
# 连接与收发
# =============================================================================
def connect(host: str, port: int = PORT) -> bool:
"""
建立与 DTU 装置的 TCP 连接。
若有旧连接则先关闭;连接时超时 3 秒;连接成功后设为 2 秒。
"""
global _sock
try:
if _sock:
_sock.close() # 关闭旧连接
logger.info("[connect] 已关闭旧连接")
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 TCP socket
_sock.settimeout(3.0) # 连接阶段超时 3 秒
_sock.connect((host, port)) # 连接目标主机和端口
_sock.settimeout(2.0) # 连接成功后收发超时 2 秒
logger.info("[connect] 连接 DTU 成功: {host}:{port}", host=host, port=port)
return True
except Exception as e:
logger.exception("[connect] 连接失败: {host}:{port} - {err}", host=host, port=port, err=e)
return False
def _send(cmd: int, data: bytes = b"") -> bool:
"""发送一帧到设备"""
if not _sock:
logger.error("[_send] 未连接设备,无法发送 cmd={cmd:#x}", cmd=cmd)
return False
try:
frame = build_frame(cmd, data)
_sock.sendall(frame) # 构造帧并一次性发送
logger.debug("[_send] 已发送 cmd={cmd:#x}, data_len={length}", cmd=cmd, length=len(data))
return True
except Exception as e:
logger.exception("[_send] 发送失败 cmd={cmd:#x} - {err}", cmd=cmd, err=e)
return False
def _recv() -> Optional[Tuple[int, bytes]]:
"""
从设备接收一帧。若首次接收长度不足,会再尝试接收最多 2 次(共 3 次),
拼齐整帧后再解析。单次 recv 最多 256KB。
"""
if not _sock:
logger.error("[_recv] 未连接设备,无法接收数据")
return None
try:
buffer = b""
max_attempts = 3 # 首次 + 最多再收 2 次
for attempt in range(max_attempts):
chunk = _sock.recv(256 * 1024)
if not chunk:
logger.debug("[_recv] 第 {n} 次 recv 收到空数据,连接可能已关闭", n=attempt + 1)
break
buffer += chunk
if len(buffer) < 5: # 至少需要 5 字节才能解析头
logger.debug("[_recv] 第 {n} 次 recv 后共 {len_} 字节,继续接收", n=attempt + 1, len_=len(buffer))
continue
data_len = (buffer[2] << 8) | buffer[3] # 数据区长度
need = 4 + data_len + 1 # 整帧长度
if len(buffer) >= need:
logger.debug("[_recv] 第 {n} 次 recv 后收齐整帧 len={len_}", n=attempt + 1, len_=need)
return parse_frame(buffer[:need])
logger.debug(
"[_recv] 第 {n} 次 recv 后共 {have} 字节,需要 {need} 字节,继续接收",
n=attempt + 1,
have=len(buffer),
need=need,
)
if len(buffer) < 5:
logger.error("[_recv] 接收数据太短, len={len_}", len_=len(buffer))
return None
need = 4 + ((buffer[2] << 8) | buffer[3]) + 1
logger.error(
"[_recv] 尝试 {max_attempts} 次后数据仍不完整, 需要 {need} 字节, 实际 {actual} 字节",
max_attempts=max_attempts,
need=need,
actual=len(buffer),
)
return None
except Exception as e:
logger.exception("[_recv] 接收异常 - {err}", err=e)
return None
def send_key(code: int) -> bool:
"""发送按键码到设备"""
return _send(CMD_KEY, bytes([code & 0xFF])) # 按键码单字节
def fetch_screen() -> Optional[bytes]:
"""
向设备请求当前屏幕显存,返回 1bpp 位图数据。
发送 CMD_LCDMEM + 起始地址 0设备回复 payload 前 4 字节为地址,后续为位图,返回 payload[4:]。
"""
if not _sock:
logger.error("[fetch_screen] 未连接设备,无法读取屏幕")
return None
if not _send(CMD_LCDMEM, struct.pack(">I", 0)): # 发送读取显存命令,起始地址 0大端 4 字节)
logger.error("[fetch_screen] 发送 CMD_LCDMEM 失败")
return None
logger.debug("[fetch_screen] 发送 CMD_LCDMEM 成功")
result = _recv() # 接收设备回复
if not result:
logger.error("[fetch_screen] 未收到任何回复result=None")
return None
cmd, payload = result
if cmd != CMD_LCDMEM:
# 可能是按键 ACK、保活等其他短帧按键操作时较常见这里只做调试日志不视为错误
logger.debug("[fetch_screen] 收到非 LCDMEM 帧 cmd={cmd:#x},忽略", cmd=cmd)
return None
# 确认为 LCDMEM 回复
if len(payload) >= 4: # 前 4 字节为地址,后面是位图
logger.debug("[fetch_screen] 收到 LCDMEMpayload_len={len_}", len_=len(payload))
return payload[4:]
logger.debug("[fetch_screen] 收到 LCDMEMpayload_len={len_} (<4)", len_=len(payload))
return payload or None
def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]:
"""
将 1bpp 位图转为 PNG 字节流。
每字节 8 像素高位在前bi=(y*w+x)//8 为字节索引bit=7-(x%8) 为位索引1 为白 2550 为黑 0。
"""
try:
from PIL import Image
logger.debug(
"[mono_to_png] 开始转换为 PNGwidth={width}, height={height}, data_len={len_}",
width=width,
height=height,
len_=len(data),
)
img = Image.new("1", (width, height)) # 创建 1bpp 黑白图像
pix = img.load() # 获取像素访问对象
'''
for y in range(height):
for x in range(width):
bi = (y * width + x) // 8 # 字节索引:每 8 像素一字节
bit = 7 - (x % 8) # 位索引:高位在前
pix[x, y] = 0 if (bi < len(data) and (data[bi] >> bit) & 1) else 255 # 1→白 0→黑
'''
for y in range(height):
for x in range(width):
pix[x, y] = data[y * width + x]
buf = io.BytesIO() # 内存缓冲区
img.save(buf, format="PNG") # 保存为 PNG 格式
png_bytes = buf.getvalue()
logger.debug("[mono_to_png] 转换 PNG 成功, png_len={len_}", len_=len(png_bytes))
return png_bytes # 返回 PNG 字节流
except ImportError:
logger.error("[mono_to_png] 缺少 PIL/Pillow, 请执行 pip install pillow")
return None
except Exception as e:
logger.exception("[mono_to_png] 转换 PNG 失败 - {err}", err=e)
return None
def disconnect_device():
"""关闭与设备的连接"""
global _sock
try:
if _sock:
logger.info("[disconnect_device] 正在关闭与 DTU 的连接")
_sock.close() # 关闭 socket
_sock = None # 清空引用
except Exception as e:
logger.exception("[disconnect_device] 关闭连接时异常 - {err}", err=e)
# =============================================================================
# 屏幕推送线程
# =============================================================================
def _screen_refresh_loop():
"""
后台线程:每约 100ms 拉取一帧屏幕,转 PNG 后 base64 推送给各 WebSocket 客户端。
若检测到远程服务端断开(连续多次拉取失败),则断开设备并停止重复请求。
_refresh_stop.wait(0.1) 既作间隔,也便于收到 set 时快速退出。
"""
global _screen_clients, _refresh_stop
logger.info("[_screen_refresh_loop] 刷新线程启动")
# 连续拉取失败次数,超过阈值则认为远程已断开
MAX_CONSECUTIVE_FAILURES = 5
consecutive_failures = 0
try:
while not _refresh_stop.is_set(): # 未被要求停止时持续循环
if _screen_clients and _sock: # 有客户端且已连接设备
data = fetch_screen() # 拉取屏幕位图
logger.debug("[fetch_screen] 拉取屏幕位图数据: {data}", data=data)
if data:
consecutive_failures = 0 # 成功则清零
# 当前协议未返回宽高,这里固定为 160x160可根据实际情况调整
w, h = 160, 160
png = mono_to_png(data, w, h) # 转为 PNG
if png:
b64 = base64.b64encode(png).decode("ascii") # base64 编码便于 JSON 传输
for sid in list(_screen_clients): # 复制列表避免迭代时修改
try:
socketio.emit(
"screen",
{"png": b64, "w": w, "h": h},
room=sid,
) # 推送给该客户端
except Exception as e:
logger.exception(
"[_screen_refresh_loop] 推送 screen 给 {sid} 失败 - {err}",
sid=sid,
err=e,
)
else:
# 拉取失败,累计连续失败次数
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
logger.warning(
"[_screen_refresh_loop] 连续 {n} 次拉取失败,判定远程服务端已断开,停止请求并断开连接",
n=consecutive_failures,
)
disconnect_device()
# 通知所有订阅客户端:设备已断开
for sid in list(_screen_clients):
try:
socketio.emit("device_disconnected", {"reason": "remote_closed"}, room=sid)
except Exception as e:
logger.debug("[_screen_refresh_loop] 推送 device_disconnected 失败 sid={sid} - {err}", sid=sid, err=e)
consecutive_failures = 0
else:
consecutive_failures = 0 # 无客户端或未连接时清零,便于下次连接后重新计数
_refresh_stop.wait(timeout=0.1) # 等待 100ms若被 set 则立即返回
finally:
logger.info("[_screen_refresh_loop] 刷新线程退出")
# =============================================================================
# Flask + SocketIO
# =============================================================================
app = Flask(__name__) # 创建 Flask 应用
app.config["SECRET_KEY"] = "remo_disp" # 会话密钥
socketio = SocketIO(app, cors_allowed_origins="*") # 创建 SocketIO允许跨域
FAVICON_PATH = os.path.join(_get_base_path(), "static", "favicon.ico") # favicon 文件路径
@app.route("/favicon.ico")
def favicon():
"""提供网站图标"""
if os.path.exists(FAVICON_PATH):
return send_file(FAVICON_PATH, mimetype="image/x-icon")
return "", 404 # 不存在则返回 404
@app.route("/")
@app.route("/index.html")
def index():
"""主页,返回前端 HTML"""
return send_file(
os.path.join(_get_base_path(), "remo_disp_ui.html"),
mimetype="text/html; charset=utf-8",
)
@socketio.on("connect")
def on_connect():
"""WebSocket 客户端连接时触发(可留空)"""
logger.info("[on_connect] WebSocket 客户端已连接 sid={sid}", sid=request.sid)
@socketio.on("disconnect")
def on_disconnect():
"""WebSocket 客户端断开(关标签页等)时,从 _screen_clients 移除,若无客户端则断开设备"""
global _screen_clients, _refresh_thread, _refresh_stop
sid = request.sid # 获取断开客户端的 session id
logger.info("[on_disconnect] WebSocket 客户端断开 sid={sid}", sid=sid)
if sid in _screen_clients:
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
@socketio.on("connect_device")
def on_connect_device(data):
"""前端请求连接设备:连接成功后加入 _screen_clients启动刷新线程回复 connect_result"""
global _screen_clients, _refresh_thread, _refresh_stop
host = (data.get("host") or "").strip() # 从 data 取 host去空格
port = int(data.get("port") or PORT) # 从 data 取 port缺省 7003
logger.info("[on_connect_device] 请求连接 DTU: {host}:{port}", host=host, port=port)
ok = connect(host, port) # 建立 TCP 连接
if ok:
_screen_clients.add(request.sid) # 将当前客户端加入屏幕订阅
_refresh_stop.clear() # 清除停止标志,允许刷新线程运行
if _refresh_thread is None or not _refresh_thread.is_alive(): # 刷新线程未运行
_refresh_thread = threading.Thread(target=_screen_refresh_loop, daemon=True) # 创建后台线程
_refresh_thread.start() # 启动线程
logger.info("[on_connect_device] 已启动屏幕刷新线程")
emit("connect_result", {"success": True}) # 回复连接成功
logger.info("[on_connect_device] 连接 DTU 成功,已加入订阅 sid={sid}", sid=request.sid)
else:
logger.error("[on_connect_device] 连接 DTU 失败: {host}:{port}", host=host, port=port)
emit("connect_result", {"success": False, "error": "connect failed"}) # 回复连接失败
@socketio.on("disconnect_device")
def on_disconnect_device():
"""前端请求断开设备:从 _screen_clients 移除,若无客户端则断开设备,回复 disconnect_result"""
global _screen_clients, _refresh_stop
sid = request.sid
logger.info("[on_disconnect_device] 前端请求断开设备 sid={sid}", sid=sid)
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
emit("disconnect_result", {"ok": True}) # 回复断开成功
logger.info("[on_disconnect_device] 设备已断开(若为最后一个客户端)")
@socketio.on("key")
def on_key(data):
"""前端发送按键:从 data 取 code转发给设备"""
code = int(data.get("code", 0)) # 按键码,缺省 0
logger.debug("[on_key] 收到按键 code={code:#x}", code=code)
send_key(code) # 发送给 DTU 设备
# =============================================================================
# 启动
# =============================================================================
def _configure_log_level():
"""根据全局 LOG_LEVEL 配置日志级别(见文件顶部 LOG_LEVEL 常量)。"""
level = LOG_LEVEL.upper() if isinstance(LOG_LEVEL, str) else "INFO"
if level not in ("TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"):
level = "INFO"
logger.remove() # 移除默认的 stderr 输出
logger.add(sys.stderr, level=level, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>")
logger.add(
"remo_disp.log",
rotation="1 week",
encoding="utf-8",
enqueue=True,
backtrace=True,
diagnose=False,
level=level,
)
return level
@logger.catch
def main():
"""主入口:启动 Web 服务"""
port = 8181 # HTTP 服务端口
log_level = _configure_log_level()
logger.info("日志级别: {level}", level=log_level)
logger.info("远程显示服务: http://localhost:{port}", port=port)
logger.info("使用 WebSocket 传输,在浏览器中打开上述地址")
if len(sys.argv) >= 2:
host = sys.argv[1]
logger.info("[main] 尝试预连接 DTU: {host}", host=host)
if connect(host): # 若命令行有 IP预连接
logger.info("[main] 已预连接: {host}", host=host)
else:
logger.error("[main] 预连接失败: {host}", host=host)
socketio.run(app, host="0.0.0.0", port=port, debug=True, allow_unsafe_werkzeug=True) # 监听所有网卡
if __name__ == "__main__":
main()