In-House Drivers Integration

Hi everyone,

I want to open a discussion for a solution for building drivers for integrated devices on liquid handlers. For context, my company is interested in integrating third-party devices (plate readers, centrifuges, etc.) on our STARs instruments. While I’m aware that companies like Hamilton probably have some of these drivers, we are leaning towards an in-house/DIY solution for cost and flexibility.

I am curious if anyone has ever done this before and is willing to give any pointers on this. Like what would be a good starting point or right direction for building a driver from scratch.

-Nat

5 Likes

There are many ways that you can accomplish this. It used to be quite difficult, but with tools like Gemini CLI and ChatGPT, and many more examples out there on Github, it’s getting a lot easier.

If I was going to do this, I would get the API command set and ask Gemini CLI to create either a python script or C# command line exe that accepts the device task properties as arguments. You can then call the python script from a batch file in Venus, or the exe with the command line / run external application.

There would definitely be inefficacies and errors, but with a few iterations you could get something working pretty quick.

4 Likes

There’s definitely a benefit to vendor provided solutions like less time to implementation, reliability (in some instances), support and even system testing docs.

The challenge is that sometimes you have to write an instrument driver, sometimes you have to write a software driver to control the hardware, and sometimes you have to write both. :pensive_face:

You can create scripts, exe’s or even DLL’s that you just load! (Tons of options!)

However my preferred practice is to implement abstraction layers around all external APIs, regardless of their origin. This typically involves creating service wrappers that normalize vendor-specific interfaces into consistent internal contracts. I prefer architecting these as RESTful web services with OpenAPI/Swagger documentation, which provides several benefits: standardized endpoint documentation, automatic client SDK generation, and a foundation for building responsive web applications or MCP tools. This approach also facilitates microservice architectures and makes it easier to swap out vendor implementations without affecting downstream consumers, plus you can add observability layers and even logging layers to really bring your lab into this century.

With that said, you should try on your own (with Claude Code + context aka docs) and then try other solutions like this one: Introducing the Connector-Factory: a python template for kickstarting device integration with UniteLabs CDK

6 Likes

I’ve written a few in python for reasons of speed of implementation along the lines of what @RKeyser suggests for cases where there isn’t a library already. There are certainly more graceful solutions but you can pass exit codes back to venus and it’s yet to fail me

1 Like

I have thought of using Python myself, as I have done so in the past, but on a very small scale and not a hardware integration where two-way communication is needed. That being said, the only issue I have with this approach is establishing a sufficient means of passing parameters (or exit codes, as you mentioned) between Venus and the Python script. The only option I could think of is passing the parameters via .csv files, but I was wondering if there are other ways instead.

I you build a command-line interface with e.g. the argparse library, you can pass as many arguments as you want via a shell command. The tricky part is to get return values as of now the only thing VENUS receives back is the exit code from the script. If more sophisticated returns are needed, you could write all output to a log and parse this within your driver submethod. Good idea anyways, if you ever need to track any issue.

We build drivers for the A4S sealer, XPeel Desealer, the Hettich SBS300 and also the LVL Decapper all with Python.

E.g. for the sealer I usually pass the COM Port and whatever needs to be done. E.g. sealer_driver COM13 seal, sealer_driver COM13 set_temp 160 and so on. If needed I can also look up the lines in VENUS for calling this.

best
Dominik

3 Likes

This approach is perfectly valid. However, it’s mostly for sending information like COM ports etc. Command line enforces a maximum character limit of 8191, making sending scripts or stuff like that tricky. In these cases it’s really better to write the information to a file and read it out.

1 Like

I think HTTP might be better here - run an external server that talks to your 3rd party instruments and use the Venus HTTP library to talk to that. Much easier to get return values and structured I/O.

2 Likes

Hi @dominik.b thank you for the feedback! Could you (or anyone) provide an example of how to pass arguments like that using the argparse library coupled with the shell command? I am new to this type of way and want to make sure I understand it properly before playing around.

For plate shakers and centrifuges, it does seem doable to control via Python and similar means; however, my other concern is plate readers, as those seem more complex than just sending simple commands to start the process. Not only does it require more parameters to pass through, but it also involves data extraction and wait time for when the plate reader is done reading. Any thoughts or advice on how to approach this?

Again, in the spirit of going from 0 → 1, plate readers are often easier than other devices. Usually you’ll config the read parameters (ie wells, wavelengths, etc) in the plate reader’s SW and save that as some kind of protocol definition. For the automation interaction, your script will just send the plate reader’s sw the name of the protocol definition. This, of course, depends on the specific plate reader software and interface.

You could then parse the output file based on the barcode, or a folder watcher, or a million other ways. I think the responses here are a good sampling of options, and it really depends on how much time you want to spend on making things flexible, extensible, logged, etc. The nice thing nowadays is that development is much, much faster and easier than it was even 2 years ago.

2 Likes

I second this even if you’re using Python to run the USB/Serial interface. I’ve had success integrating pressure sensors using this method.

I also recommend adding a “service interface” to the “server” as well. IE using a command line parameter to open a menu in the command window where testing and troubleshooting can be run independent from your method.

Logging with robust error reporting is also an important feature to implement.

2 Likes

You can build connectors yourself using the UniteLabs Python CDK (open-source), which comes with a number of nice features:

  • SiLA 2 compliance out of the box
  • Templates for standard & core features so you don’t start from zero
  • Code-first approach, without needing manual XML definitions
  • Built-in testing tools and structured maintenance

You could then use a SiLA client within your LH software, or integrate via an external workflow engine/scheduler.

If you don’t want to start from scratch, there are already connectors for Hamilton STAR and 100+ other instruments (liquid handlers, readers, incubators, etc.).

I’d be happy to brainstorm how you could build one for your instrument if that would be useful.

3 Likes

This is the code we use for the A4S.

So basically we have the A4S_Sealer.py we can directly call from the shell like:

python A4S_sealer.py seal -p COM13

Getting this to work in VENUS required some nasty string formatting, I’m sure there are cleaner ways to do this. Bascially we have an HSL block to generate the command, and then just send it via the Shell:

str_python_path = "\"C:\Users\Hamilton\AppData\Local\Programs\Python\Python311\python.exe\"" + " \"" + "C:\Users\Hamilton\Desktop\Metabarcoding on Vantage\source\sub method librarys\\sealer_controler\A4S_Sealer.py" + "\" seal -p COM" + StrIStr(i_int_com_port);

If you need logging you could either make the driver write a logfile with a timestamp or directly route stdout to the log.

import argparse
import sys
import time
import serial


class A4SController:
    def __init__(self, port='COM3', baudrate=19200, timeout=1.0, verify_seconds=2.5, skip_verify=False):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.verify_seconds = verify_seconds
        self.skip_verify = skip_verify
        self.serial_connection = None

    def connect(self):
        try:
            self.serial_connection = serial.Serial(
                self.port,
                self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=self.timeout,
            )
            print(f"Connected to A4S device on {self.port}.")
        except Exception as e:
            print(f"Failed to connect to A4S on {self.port}: {e}")
            sys.exit(1)

        if not self.skip_verify:
            ok = self.verify_device(self.verify_seconds)
            if ok:
                print("Device fingerprint OK (A4S-style status detected).")
            else:
                print("WARNING: Did not see an A4S-style status line during verification.")
                print("         If this port is correct and you are sure then,")
                print("         rerun with --no-verify or increase --verify-seconds.")
                sys.exit(3)

    def close_connection(self):
        if self.serial_connection and self.serial_connection.is_open:
            self.serial_connection.close()
            print("Disconnected from A4S device.")

    def send_command(self, command: str):
        if self.serial_connection and self.serial_connection.is_open:
            try:
                self.serial_connection.write((command + '\r').encode('ascii'))
                time.sleep(0.2)
                return self.read_response()
            except Exception as e:
                print(f"Error sending command: {e}")
                return None
        else:
            print("Serial connection is not open.")
            return None

    def read_response(self):
        if self.serial_connection and self.serial_connection.is_open:
            try:
                data = self.serial_connection.read_until(b"\r")
                if not data:
                    return None
                return data.decode("ascii", errors="ignore").strip()
            except Exception as e:
                print(f"Error reading response: {e}")
                return None
        else:
            print("Serial connection is not open.")
            return None

    def set_temperature(self, temperature_c: int):
        if not (50 <= temperature_c <= 200):
            print("Error: Temperature must be between 50 and 200 °C.")
            return None
        temp_param = f"{temperature_c * 10:04d}"
        command = f"*00DH={temp_param}zz!"
        print(f"Setting temperature to {temperature_c} °C with command: {command}")
        return self.send_command(command)

    def send_seal_command(self):
        command = "*00GS=zz!"
        print("Sending seal command...")
        return self.send_command(command)

    # -------- Verification / status parsing --------
    def verify_device(self, probe_seconds: float = 2.5) -> bool:
        """Listen briefly for a recognizable A4S status line to confirm the right device."""
        if not (self.serial_connection and self.serial_connection.is_open):
            return False

        # Clear any stale bytes and give the device a moment to start streaming
        try:
            self.serial_connection.reset_input_buffer()
            self.serial_connection.reset_output_buffer()
        except Exception:
            pass

        deadline = time.time() + probe_seconds
        last_line = None
        while time.time() < deadline:
            data = self.serial_connection.read_until(b"\r")
            if not data:
                continue
            line = data.decode("ascii", errors="ignore").strip()
            last_line = line
            # Accept a typical A4S telemetry line like: "*T...=temp,sys,heater,err,warn"
            if self._parse_status_line(line) is not None:
                return True

        if last_line:
            print(f"Last line received during verify: {last_line}")
        return False

    @staticmethod
    def _parse_status_line(line: str):
        # Expected: '*T...=temp_tenths,sys,heater,err,warn'
        if not line or not line.startswith("*T"):
            return None
        try:
            left, right = line.split("=", 1)
            parts = right.split(",")
            if len(parts) < 5:
                return None
            temp_tenths = int(parts[0])
            sys_state = parts[1]  # '0' -> Idle
            heater_state = parts[2]
            err_code = parts[3]
            warn_code = parts[4]
            return {
                "timestamp_raw": left.replace("T", ""),
                "temperature_c": temp_tenths / 10.0,
                "is_idle": (sys_state == "0"),
                "heater_state_raw": heater_state,
                "error_code": err_code,
                "warning_code": warn_code,
                "raw": line,
            }
        except Exception:
            return None

    def wait_for_idle(self, timeout_sec: int = 120, require_active_transition: bool = True):
        if not (self.serial_connection and self.serial_connection.is_open):
            print("Serial connection is not open.")
            return False

        deadline = time.time() + timeout_sec
        saw_active = False

        while time.time() < deadline:
            try:
                line_bytes = self.serial_connection.read_until(b"\r")
            except Exception as e:
                print(f"Error while waiting for status: {e}")
                return False

            if not line_bytes:
                continue

            line = line_bytes.decode("ascii", errors="ignore").strip()
            status = self._parse_status_line(line)
            if status is None:
                continue

            if not status["is_idle"]:
                saw_active = True

            if status["is_idle"]:
                if require_active_transition:
                    if saw_active:
                        return True
                else:
                    return True

        return False

    def read_status_once(self):
        print("Reading device status...")
        for _ in range(3):
            raw = self.read_response()
            if not raw:
                continue
            parsed = self._parse_status_line(raw)
            if parsed:
                formatted = (
                    f"Timestamp: {parsed['timestamp_raw']}\n"
                    f"Current Temperature: {parsed['temperature_c']} °C\n"
                    f"System Status: {'Idle' if parsed['is_idle'] else 'Active'}\n"
                    f"Heater: {'Off' if parsed['heater_state_raw'] == '0' else 'Ready'}\n"
                    f"Error Code: {parsed['error_code']}\n"
                    f"Warning Code: {parsed['warning_code']}\n"
                )
            else:
                formatted = f"Raw Data: {raw}"
            print(formatted)


def temperature_type(s: str) -> int:
    try:
        v = int(s)
    except ValueError:
        raise argparse.ArgumentTypeError("temperature must be an integer (°C)")
    if not (50 <= v <= 200):
        raise argparse.ArgumentTypeError("temperature must be between 50 and 200 °C")
    return v


def build_parser():
    parent = argparse.ArgumentParser(add_help=False)
    parent.add_argument("-p", "--port", required=True, help="Serial port (e.g., COM5 or /dev/ttyUSB0)")
    parent.add_argument("--baudrate", type=int, default=19200, help="Baud rate (default: 19200)")
    parent.add_argument("--timeout", type=float, default=1.0, help="Read timeout in seconds (default: 1.0)")
    parent.add_argument("--no-verify", action="store_true", help="Skip post-connect device verification")
    parent.add_argument("--verify-seconds", type=float, default=2.5,
                        help="Seconds to listen for status during verification (default: 2.5)")

    parser = argparse.ArgumentParser(prog="sealer.py", description="A4S Heat Sealer Controller")
    sub = parser.add_subparsers(dest="cmd", required=True)

    # seal
    p_seal = sub.add_parser("seal", parents=[parent], help="Send seal command and wait until Idle.")
    p_seal.add_argument("--wait-timeout", type=int, default=120,
                        help="Timeout (seconds) for waiting for Idle (default: 120)")
    p_seal.add_argument("--no-active-check", action="store_true",
                        help="Do not require seeing Active before confirming Idle")

    # set-temp
    p_temp = sub.add_parser("set-temp", parents=[parent], help="Set block temperature (°C).")
    p_temp.add_argument("temperature", type=temperature_type, metavar="C",
                        help="Target temperature (50–200 °C)")

    return parser


def main():
    parser = build_parser()
    args = parser.parse_args()

    a4s = A4SController(
        port=args.port,
        baudrate=getattr(args, "baudrate", 19200),
        timeout=getattr(args, "timeout", 1.0),
        verify_seconds=getattr(args, "verify_seconds", 2.5),
        skip_verify=getattr(args, "no_verify", False),
    )
    a4s.connect()
    try:
        if args.cmd == "seal":
            resp = a4s.send_seal_command()
            print(f"Seal Command Response: {resp}")
            ok = a4s.wait_for_idle(
                timeout_sec=args.wait_timeout,
                require_active_transition=(not args.no_active_check),
            )
            if ok:
                print("sealed")
            else:
                print("Error: seal did not complete (timeout waiting for Idle).")
                sys.exit(2)

        elif args.cmd == "set-temp":
            resp = a4s.set_temperature(args.temperature)
            print(f"Set Temperature Response: {resp}")

    finally:
        a4s.close_connection()


if __name__ == "__main__":
    main()
4 Likes

I want to thank everyone for the tips and tricks so far!

That being said, while I am exploring integrating via Python, I am curious about what instruments are “Python-friendly” in that regard. For instance, I am currently playing around with controlling a BioShaker via PySerial, given that the integration manual is available for reference. But I was wondering if that’s the same (similar or simpler) for other devices like plate readers and centrifuges. I don’t have either yet to see for myself, hence I would like to hear if anyone has any recommendations based on what they worked with before.

As long as it can be controlled via serial port, you can use it with PySerial. So far we did A4S, Xpeel, Hettich SBS300 and LVL Capper/Decapper.

2 Likes

We have been building these kind of “drivers” for a while. We do this as a service. we build them using Python. You can connect to pretty much anything using Python. We’ve done serial, .NET, http, anything. You don’t need the instrument to be “python friendly” because Python provides libraries to connect to anything.

I agree these days it’s pretty straightforward using ChatGPT and the like. In most cases you will get a working code.

As others have said we use command line to pass arguments to the driver. Since we write the driver for a specific customer we tend not to provide the full API, just the commands they use and we make them friendly.
So for example for a plate reader there might be 2 command line params:
Reader.py -o
Connects to the plate reader, issues the open command, maybe wait until the tray is open.
Reader.py -r
Connects again, closes the tray, initiates the read, wait until the read is finished, opens the tray again, returns 0 if all OK, 1 if error. (sys.exit(code))
This return value can be read in virtually any robot system that can issue a command line executable.
Even parsing the reader results and returning them in a CSV is very easy in Python.

Also

6 Likes

we have a bunch in pylabrobot:

3 Likes

Hi @Nat,

Simple answer: anything from an “integrator-first” Original Equipment Manufacturer (OEM)

When integrating a new machine you have multiple options that can be over-simplistically classified like this:

  1. OEM provides a high-quality API:
    high-quality or well-developed means it gives access to all machine features, is OS-agnostic, is open-source, and its dependencies won’t cause compatibility issues with your other libraries - or drag you back into Windows Vista …

  2. OEM provides firmware command set == “integrator-first” OEM:
    this allows you to write your own high-quality driver/backend in hours!
    These are, in my opinion, the best companies in this space, they build excellent machines and have built their business model on it; I feel I can name some without angering people (only positive vibes here :blush: ): Inheco, Brooks Automation, Azenta, Mettler Toledo, …
    We’ve found this to be even faster than using a high-quality API (of which there are not many).
    We now exclusively purchase machines from “integrator-first” companies - except where there’s truly no alternative.

  3. OEM provides nothing:
    You are forced to use the GUI of the OEM? Maybe even be asked to pay a lot of money for it? It cannot do the job you need to do? You cannot switch to another machine? You really have to figure out how to use it still? → you are now forced to reverse engineer the firmware to build a driver/backend on top of it; this can be very time-consuming, depending on the complexity of the machine… would be great if there were an MIT-licensed open-source library that provided a list of pre-made integrations - along with a global community of dedicated developers helping identify and fix bugs in this category :eyes:

Noteworthy mention:
SiLA (Standardisation in Lab Automation) was meant to solve this problem by convincing OEMs to write gRPC contracts into their machines. In theory, you’d just need the .proto contract file and generator to get a working Python driver.
But there are many issues: finding the contract and generator file is a huge pain and sometimes impossible, the concept behind SiLA is “open”/“free” but the actual drivers might not be, many seem to cost a ridiculous fortune in themselves (and as a result aren’t shareable - destroying the ability to share automated protocols with your collaborators who can’t afford to pay the same you already did), most/many OEMs don’t adhere to it, the ones that do often don’t expose the full set of machine features (i.e. creating low-quality drivers/backends), server setups can vary massively, the underlying licenses are often unclear (potentially creating IP liabilities), the underlying codebase is hard to find and even harder to modify (a consortium has been created to oversee it, becoming a member of which costs between 3,850-15,400 EUR per year), …


As a result of all this - and after seeing company after company spend huge amounts of time and money rebuilding existing drivers in isolation - we chose to build on top of and expand PyLabRobot. :slight_smile:
After all, we’re not in the business of selling automation - we’re in the business of producing a biological product.
Automation is just a means to that end.


It seems you are forced to use VENUS. I believe you can prototype a mixture of both right now using this excellent suggestion:

Write a small HTTP server based on PyLabRobot for any of the available machines there.
Use this HTTP Library in VENUS → a nice fusion of old and new which should yield results in hours to days.
If the PyLabRobot integration is not yet able to do what you need it to do, you can update it… but crucially, you won’t start from scratch nor pay for anything (with money).

Hope this helps :slight_smile:

Please correct any of my statements if you find a mistake!

4 Likes

@rickwierenga Funny enough, I have already started diving into PLR as we speak, which I also made an account on the PLR discussion forum :slightly_smiling_face:

Also, thank you @CamilloMoschner for the in-depth explanation!! I have heard of SiLA but couldn’t wrap my head around it until now, so thank you for the heads up on its limitations. As for my integration, I wouldn’t say that I am forced to use VENUS; rather, I am more familiar with it than I am with any backend coding (Python, including). Hence, my purpose is to integrate it into VENUS by executing a Python script, as you and others have mentioned in this form.

Also, another question that came up in that regard, in order to use PLR, I would need the respective drivers of the instrument for the backend coding of PLR to communicate with the frontend device GUI, correct? For example, for the VSpin centrifuge, I would also need VWorks in order to use PLR. I’m asking simply because I am working on integrating QInstrument BioShake via pySerial. I noticed that I do not need to have Q-COM installed beforehand, but then again, it seems like the BioShake falls into the OEM provides a high-quality API as you described.

2 Likes