This is the code we use for the A4S.
So basically we have the A4S_Sealer.py we can directly call from the shell like:
python A4S_sealer.py seal -p COM13
Getting this to work in VENUS required some nasty string formatting, I’m sure there are cleaner ways to do this. Bascially we have an HSL block to generate the command, and then just send it via the Shell:
str_python_path = "\"C:\Users\Hamilton\AppData\Local\Programs\Python\Python311\python.exe\"" + " \"" + "C:\Users\Hamilton\Desktop\Metabarcoding on Vantage\source\sub method librarys\\sealer_controler\A4S_Sealer.py" + "\" seal -p COM" + StrIStr(i_int_com_port);
If you need logging you could either make the driver write a logfile with a timestamp or directly route stdout to the log.
import argparse
import sys
import time
import serial
class A4SController:
def __init__(self, port='COM3', baudrate=19200, timeout=1.0, verify_seconds=2.5, skip_verify=False):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.verify_seconds = verify_seconds
self.skip_verify = skip_verify
self.serial_connection = None
def connect(self):
try:
self.serial_connection = serial.Serial(
self.port,
self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout,
)
print(f"Connected to A4S device on {self.port}.")
except Exception as e:
print(f"Failed to connect to A4S on {self.port}: {e}")
sys.exit(1)
if not self.skip_verify:
ok = self.verify_device(self.verify_seconds)
if ok:
print("Device fingerprint OK (A4S-style status detected).")
else:
print("WARNING: Did not see an A4S-style status line during verification.")
print(" If this port is correct and you are sure then,")
print(" rerun with --no-verify or increase --verify-seconds.")
sys.exit(3)
def close_connection(self):
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
print("Disconnected from A4S device.")
def send_command(self, command: str):
if self.serial_connection and self.serial_connection.is_open:
try:
self.serial_connection.write((command + '\r').encode('ascii'))
time.sleep(0.2)
return self.read_response()
except Exception as e:
print(f"Error sending command: {e}")
return None
else:
print("Serial connection is not open.")
return None
def read_response(self):
if self.serial_connection and self.serial_connection.is_open:
try:
data = self.serial_connection.read_until(b"\r")
if not data:
return None
return data.decode("ascii", errors="ignore").strip()
except Exception as e:
print(f"Error reading response: {e}")
return None
else:
print("Serial connection is not open.")
return None
def set_temperature(self, temperature_c: int):
if not (50 <= temperature_c <= 200):
print("Error: Temperature must be between 50 and 200 °C.")
return None
temp_param = f"{temperature_c * 10:04d}"
command = f"*00DH={temp_param}zz!"
print(f"Setting temperature to {temperature_c} °C with command: {command}")
return self.send_command(command)
def send_seal_command(self):
command = "*00GS=zz!"
print("Sending seal command...")
return self.send_command(command)
# -------- Verification / status parsing --------
def verify_device(self, probe_seconds: float = 2.5) -> bool:
"""Listen briefly for a recognizable A4S status line to confirm the right device."""
if not (self.serial_connection and self.serial_connection.is_open):
return False
# Clear any stale bytes and give the device a moment to start streaming
try:
self.serial_connection.reset_input_buffer()
self.serial_connection.reset_output_buffer()
except Exception:
pass
deadline = time.time() + probe_seconds
last_line = None
while time.time() < deadline:
data = self.serial_connection.read_until(b"\r")
if not data:
continue
line = data.decode("ascii", errors="ignore").strip()
last_line = line
# Accept a typical A4S telemetry line like: "*T...=temp,sys,heater,err,warn"
if self._parse_status_line(line) is not None:
return True
if last_line:
print(f"Last line received during verify: {last_line}")
return False
@staticmethod
def _parse_status_line(line: str):
# Expected: '*T...=temp_tenths,sys,heater,err,warn'
if not line or not line.startswith("*T"):
return None
try:
left, right = line.split("=", 1)
parts = right.split(",")
if len(parts) < 5:
return None
temp_tenths = int(parts[0])
sys_state = parts[1] # '0' -> Idle
heater_state = parts[2]
err_code = parts[3]
warn_code = parts[4]
return {
"timestamp_raw": left.replace("T", ""),
"temperature_c": temp_tenths / 10.0,
"is_idle": (sys_state == "0"),
"heater_state_raw": heater_state,
"error_code": err_code,
"warning_code": warn_code,
"raw": line,
}
except Exception:
return None
def wait_for_idle(self, timeout_sec: int = 120, require_active_transition: bool = True):
if not (self.serial_connection and self.serial_connection.is_open):
print("Serial connection is not open.")
return False
deadline = time.time() + timeout_sec
saw_active = False
while time.time() < deadline:
try:
line_bytes = self.serial_connection.read_until(b"\r")
except Exception as e:
print(f"Error while waiting for status: {e}")
return False
if not line_bytes:
continue
line = line_bytes.decode("ascii", errors="ignore").strip()
status = self._parse_status_line(line)
if status is None:
continue
if not status["is_idle"]:
saw_active = True
if status["is_idle"]:
if require_active_transition:
if saw_active:
return True
else:
return True
return False
def read_status_once(self):
print("Reading device status...")
for _ in range(3):
raw = self.read_response()
if not raw:
continue
parsed = self._parse_status_line(raw)
if parsed:
formatted = (
f"Timestamp: {parsed['timestamp_raw']}\n"
f"Current Temperature: {parsed['temperature_c']} °C\n"
f"System Status: {'Idle' if parsed['is_idle'] else 'Active'}\n"
f"Heater: {'Off' if parsed['heater_state_raw'] == '0' else 'Ready'}\n"
f"Error Code: {parsed['error_code']}\n"
f"Warning Code: {parsed['warning_code']}\n"
)
else:
formatted = f"Raw Data: {raw}"
print(formatted)
def temperature_type(s: str) -> int:
try:
v = int(s)
except ValueError:
raise argparse.ArgumentTypeError("temperature must be an integer (°C)")
if not (50 <= v <= 200):
raise argparse.ArgumentTypeError("temperature must be between 50 and 200 °C")
return v
def build_parser():
parent = argparse.ArgumentParser(add_help=False)
parent.add_argument("-p", "--port", required=True, help="Serial port (e.g., COM5 or /dev/ttyUSB0)")
parent.add_argument("--baudrate", type=int, default=19200, help="Baud rate (default: 19200)")
parent.add_argument("--timeout", type=float, default=1.0, help="Read timeout in seconds (default: 1.0)")
parent.add_argument("--no-verify", action="store_true", help="Skip post-connect device verification")
parent.add_argument("--verify-seconds", type=float, default=2.5,
help="Seconds to listen for status during verification (default: 2.5)")
parser = argparse.ArgumentParser(prog="sealer.py", description="A4S Heat Sealer Controller")
sub = parser.add_subparsers(dest="cmd", required=True)
# seal
p_seal = sub.add_parser("seal", parents=[parent], help="Send seal command and wait until Idle.")
p_seal.add_argument("--wait-timeout", type=int, default=120,
help="Timeout (seconds) for waiting for Idle (default: 120)")
p_seal.add_argument("--no-active-check", action="store_true",
help="Do not require seeing Active before confirming Idle")
# set-temp
p_temp = sub.add_parser("set-temp", parents=[parent], help="Set block temperature (°C).")
p_temp.add_argument("temperature", type=temperature_type, metavar="C",
help="Target temperature (50–200 °C)")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
a4s = A4SController(
port=args.port,
baudrate=getattr(args, "baudrate", 19200),
timeout=getattr(args, "timeout", 1.0),
verify_seconds=getattr(args, "verify_seconds", 2.5),
skip_verify=getattr(args, "no_verify", False),
)
a4s.connect()
try:
if args.cmd == "seal":
resp = a4s.send_seal_command()
print(f"Seal Command Response: {resp}")
ok = a4s.wait_for_idle(
timeout_sec=args.wait_timeout,
require_active_transition=(not args.no_active_check),
)
if ok:
print("sealed")
else:
print("Error: seal did not complete (timeout waiting for Idle).")
sys.exit(2)
elif args.cmd == "set-temp":
resp = a4s.set_temperature(args.temperature)
print(f"Set Temperature Response: {resp}")
finally:
a4s.close_connection()
if __name__ == "__main__":
main()