Files
DTU-RemtoeLCD/remo_disp_server.py

354 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示 Web 服务 - Flask + WebSocket 实现
通过 RemoDispBus 协议与 DTU 装置通信,将远程 LCD 画面以 Web 页面形式展示。
前端通过 WebSocket 连接,服务端主动推送屏幕数据,实现低延迟传输。
用法: python remo_disp_server.py [装置IP]
启动后访问 http://localhost:8181
依赖: pip install flask flask-socketio pillow
"""
# base64将二进制 PNG 编码为字符串,便于通过 JSON/WebSocket 传输
import base64
# socket与 DTU 装置的 TCP 通信
import socket
# struct将整数打包为二进制如起始地址 4 字节)
import struct
# threading后台线程持续拉取屏幕并推送
import threading
# sys读取命令行参数装置 IP
import sys
# os获取当前脚本目录拼接 favicon 路径
import os
# ioBytesIO 用于在内存中保存 PNG
import io
# typing类型注解
from typing import Optional, Tuple, Set
# FlaskWeb 框架,提供 HTTP 路由
from flask import Flask, request, send_file
# SocketIOWebSocket 支持,实现实时双向通信
from flask_socketio import SocketIO, emit
# =============================================================================
# 协议常量RemoDispBus
# =============================================================================
# TAG_CLIENT 0xAA客户端发送帧的帧头标识
# TAG_DEVICE 0xBB设备回复帧的帧头标识
TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB
# PORT 7003RemoDispBus 协议默认端口
PORT = 7003
# CMD_KEEPLIVE 0保活
# CMD_INIT 1初始化获取屏幕宽高、显存大小
# CMD_KEY 2按键
# CMD_LCDMEM 3读取显存屏幕画面
CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3
# =============================================================================
# 全局状态
# =============================================================================
# _sock与 DTU 装置的 TCP socketNone 表示未连接
_sock: Optional[socket.socket] = None
# _screen_clients当前需要接收屏幕推送的 WebSocket 客户端 sid 集合
_screen_clients: Set[str] = set()
# _refresh_thread后台刷新线程持续拉取屏幕并推送
_refresh_thread: Optional[threading.Thread] = None
# _refresh_stopEventset 时刷新线程退出
_refresh_stop = threading.Event()
# =============================================================================
# 协议帧编解码
# =============================================================================
def calc_crc(data: bytes) -> int:
"""
对数据区逐字节异或,取低 8 位作为 CRC 校验码。
协议规定CRC = data[0] ^ data[1] ^ ... ^ data[n-1],取低 8 位。
"""
crc = 0 # 初始值为 0
for b in data:
crc ^= b # 逐字节异或
return crc & 0xFF # 取低 8 位作为最终 CRC
def build_frame(cmd: int, data: bytes) -> bytes:
"""
构造一帧发送数据,格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data][crc(1)]
length 大端 2 字节crc 对 data 计算。
"""
length = len(data) # 数据区长度
# 构造帧头TAG_CLIENT + cmd + 长度高字节 + 长度低字节(大端序)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)]) # 帧头 + 数据 + CRC
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""
解析设备回复帧,返回 (cmd, data) 或 None。
帧格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data(length)][crc(1)]
"""
if len(raw) < 6 or raw[0] != TAG_DEVICE: # 至少 6 字节且帧头为 TAG_DEVICE
print("[parse_frame] 错误: 帧太短或帧头不是 TAG_DEVICE(0xBB), len=%d" % len(raw))
return None
length = (raw[2] << 8) | raw[3] # 大端序解析数据区长度
if len(raw) < 4 + length + 1: # 检查是否收齐整帧头4字节+数据+CRC1字节
print("[parse_frame] 错误: 数据不完整, 需要 %d 字节, 实际 %d 字节" % (4 + length + 1, len(raw)))
return None
data = raw[4:4 + length] # 提取数据区
if calc_crc(data) != raw[4 + length]: # CRC 校验失败
print("[parse_frame] 错误: CRC 校验失败")
return None
return (raw[1], data) # 返回 (命令码, 数据)
# =============================================================================
# 连接与收发
# =============================================================================
def connect(host: str, port: int = PORT) -> bool:
"""
建立与 DTU 装置的 TCP 连接。
若有旧连接则先关闭;连接时超时 3 秒;连接成功后设为 2 秒。
"""
global _sock
try:
if _sock:
_sock.close() # 关闭旧连接
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 TCP socket
_sock.settimeout(3.0) # 连接阶段超时 3 秒
_sock.connect((host, port)) # 连接目标主机和端口
_sock.settimeout(2.0) # 连接成功后收发超时 2 秒
return True
except Exception as e:
print("[connect] 错误: 连接失败 - %s" % e)
return False
def _send(cmd: int, data: bytes = b"") -> bool:
"""发送一帧到设备"""
if not _sock:
print("[_send] 错误: 未连接设备")
return False
try:
_sock.sendall(build_frame(cmd, data)) # 构造帧并一次性发送
return True
except Exception as e:
print("[_send] 错误: 发送失败 - %s" % e)
return False
def _recv() -> Optional[Tuple[int, bytes]]:
"""
从设备接收一帧,一次性 recv 最多 256KB。
至少需 5 字节4 字节头+1 字节数据或 CRC根据头中 length 检查是否收齐整帧。
"""
if not _sock:
print("[_recv] 错误: 未连接设备")
return None
try:
data = _sock.recv(256 * 1024) # 单次接收最多 256KB
if len(data) < 5: # 至少需要 5 字节4 头 + 1 数据/CRC
print("[_recv] 错误: 接收数据太短, len=%d" % len(data))
return None
length = (data[2] << 8) | data[3] # 解析数据区长度
if len(data) < 4 + length + 1: # 数据不完整
print("[_recv] 错误: 数据不完整, 需要 %d 字节, 实际 %d 字节" % (4 + length + 1, len(data)))
return None
return parse_frame(data[:4 + length + 1]) # 解析并返回 (cmd, data)
except Exception as e:
print("[_recv] 错误: 接收异常 - %s" % e)
return None
def send_key(code: int) -> bool:
"""发送按键码到设备"""
return _send(CMD_KEY, bytes([code & 0xFF])) # 按键码单字节
def fetch_screen() -> Optional[bytes]:
"""
向设备请求当前屏幕显存,返回 1bpp 位图数据。
发送 CMD_LCDMEM + 起始地址 0设备回复 payload 前 4 字节为地址,后续为位图,返回 payload[4:]。
"""
if not _sock:
print("[fetch_screen] 错误: 未连接设备")
return None
if not _send(CMD_LCDMEM, struct.pack(">I", 0)): # 发送读取显存命令,起始地址 0大端 4 字节)
print("[fetch_screen] 错误: 发送 CMD_LCDMEM 失败")
return None
result = _recv() # 接收设备回复
if result and result[0] == CMD_LCDMEM: # 确认为 LCDMEM 回复
payload = result[1] # 数据区
if len(payload) >= 4: # 前 4 字节为地址,后面是位图
return payload[4:]
return payload or None
print("[fetch_screen] 错误: 未收到有效 LCDMEM 回复")
return None
def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]:
"""
将 1bpp 位图转为 PNG 字节流。
每字节 8 像素高位在前bi=(y*w+x)//8 为字节索引bit=7-(x%8) 为位索引1 为白 2550 为黑 0。
"""
try:
from PIL import Image
img = Image.new("1", (width, height)) # 创建 1bpp 黑白图像
pix = img.load() # 获取像素访问对象
for y in range(height):
for x in range(width):
bi = (y * width + x) // 8 # 字节索引:每 8 像素一字节
bit = 7 - (x % 8) # 位索引:高位在前
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 # 1→白 0→黑
buf = io.BytesIO() # 内存缓冲区
img.save(buf, format="PNG") # 保存为 PNG 格式
return buf.getvalue() # 返回 PNG 字节流
except ImportError:
print("[mono_to_png] 错误: 缺少 PIL/Pillow, 请执行 pip install pillow")
return None
def disconnect_device():
"""关闭与设备的连接"""
global _sock
try:
if _sock:
_sock.close() # 关闭 socket
_sock = None # 清空引用
except Exception as e:
print("[disconnect_device] 错误: 关闭连接时异常 - %s" % e)
# =============================================================================
# 屏幕推送线程
# =============================================================================
def _screen_refresh_loop():
"""
后台线程:每约 100ms 拉取一帧屏幕,转 PNG 后 base64 推送给各 WebSocket 客户端。
_refresh_stop.wait(0.1) 既作间隔,也便于收到 set 时快速退出。
"""
global _screen_clients, _refresh_stop
while not _refresh_stop.is_set(): # 未被要求停止时持续循环
if _screen_clients and _sock: # 有客户端且已连接设备
data = fetch_screen() # 拉取屏幕位图
if data:
info = {"width": 160, "height": 160} # 设置长宽 160x160
w, h = info.get("width", 160), info.get("height", 160)
png = mono_to_png(data, w, h) # 转为 PNG
if png:
b64 = base64.b64encode(png).decode("ascii") # base64 编码便于 JSON 传输
for sid in list(_screen_clients): # 复制列表避免迭代时修改
try:
socketio.emit("screen", {"png": b64, "w": w, "h": h}, room=sid) # 推送给该客户端
except Exception:
pass
_refresh_stop.wait(timeout=0.1) # 等待 100ms若被 set 则立即返回
# =============================================================================
# Flask + SocketIO
# =============================================================================
app = Flask(__name__) # 创建 Flask 应用
app.config["SECRET_KEY"] = "remo_disp" # 会话密钥
socketio = SocketIO(app, cors_allowed_origins="*") # 创建 SocketIO允许跨域
FAVICON_PATH = os.path.join(os.path.dirname(__file__), "static", "favicon.ico") # favicon 文件路径
@app.route("/favicon.ico")
def favicon():
"""提供网站图标"""
if os.path.exists(FAVICON_PATH):
return send_file(FAVICON_PATH, mimetype="image/x-icon")
return "", 404 # 不存在则返回 404
@app.route("/")
@app.route("/index.html")
def index():
"""主页,返回前端 HTML"""
return send_file(
os.path.join(os.path.dirname(__file__), "remo_disp_ui.html"),
mimetype="text/html; charset=utf-8",
)
@socketio.on("connect")
def on_connect():
"""WebSocket 客户端连接时触发(可留空)"""
pass
@socketio.on("disconnect")
def on_disconnect():
"""WebSocket 客户端断开(关标签页等)时,从 _screen_clients 移除,若无客户端则断开设备"""
global _screen_clients, _refresh_thread, _refresh_stop
sid = request.sid # 获取断开客户端的 session id
if sid in _screen_clients:
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
@socketio.on("connect_device")
def on_connect_device(data):
"""前端请求连接设备:连接成功后加入 _screen_clients启动刷新线程回复 connect_result"""
global _screen_clients, _refresh_thread, _refresh_stop
host = (data.get("host") or "").strip() # 从 data 取 host去空格
port = int(data.get("port") or PORT) # 从 data 取 port缺省 7003
ok = connect(host, port) # 建立 TCP 连接
if ok:
_screen_clients.add(request.sid) # 将当前客户端加入屏幕订阅
_refresh_stop.clear() # 清除停止标志,允许刷新线程运行
if _refresh_thread is None or not _refresh_thread.is_alive(): # 刷新线程未运行
_refresh_thread = threading.Thread(target=_screen_refresh_loop, daemon=True) # 创建后台线程
_refresh_thread.start() # 启动线程
emit("connect_result", {"success": True}) # 回复连接成功
else:
emit("connect_result", {"success": False, "error": "connect failed"}) # 回复连接失败
@socketio.on("disconnect_device")
def on_disconnect_device():
"""前端请求断开设备:从 _screen_clients 移除,若无客户端则断开设备,回复 disconnect_result"""
global _screen_clients, _refresh_stop
sid = request.sid
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
emit("disconnect_result", {"ok": True}) # 回复断开成功
@socketio.on("key")
def on_key(data):
"""前端发送按键:从 data 取 code转发给设备"""
code = int(data.get("code", 0)) # 按键码,缺省 0
send_key(code) # 发送给 DTU 设备
# =============================================================================
# 启动
# =============================================================================
def main():
"""主入口:启动 Web 服务"""
port = 8181 # HTTP 服务端口
print(f"远程显示服务: http://localhost:{port}")
print("使用 WebSocket 传输,在浏览器中打开上述地址")
if len(sys.argv) >= 2:
connect(sys.argv[1]) # 若命令行有 IP预连接
print(f"已预连接: {sys.argv[1]}")
socketio.run(app, host="0.0.0.0", port=port, debug=True, allow_unsafe_werkzeug=True) # 监听所有网卡
if __name__ == "__main__":
main()