Thought I’d share a really simple example of error recovery so folks could see how to implement it in their scripts. Let me know if this is something you find useful!
This is basically just the template script that you create with pyhamilton-new-project
with some small functions added
active_column = 0
recovery_state = {'active_column': active_column}
def recover_state(file):
with open(file, "r") as read_content:
state = json.load(read_content)
return state
def write_state(file, state):
with open(file, "w") as write_file:
json.dump(state, write_file)
if '--error-recovery' in sys.argv:
recovered_state = recover_state('state.json')
active_column = recovered_state['active_column']
and inside the command loop we wish to recover from:
active_column += 1
recovery_state.update({'active_column': active_column})
write_state('state.json', recovery_state)
The whole script together:
import os
from pyhamilton import (HamiltonInterface, LayoutManager,
Plate96, Tip96, initialize, tip_pick_up, tip_eject,
aspirate, dispense, oemerr, resource_list_with_prefix, normal_logging)
import json
import sys
liq_class = 'StandardVolumeFilter_Water_DispenseJet_Empty'
lmgr = LayoutManager('deck.lay')
plates = resource_list_with_prefix(lmgr, 'plate_', Plate96, 5)
tips = resource_list_with_prefix(lmgr, 'tips_', Tip96, 1)
liq_class = 'StandardVolumeFilter_Water_DispenseJet_Empty'
number_columns = 12
aspiration_poss_cols = [[(plates[1], well) for well in range(8*column, 8*column+8)] for column in range(number_columns)]
dispense_poss_cols = [[(plates[0], well) for well in range(8*column, 8*column+8)] for column in range(number_columns)]
vols_list = [30]*8
tips_poss_cols = [[(tips[0], well) for well in range(8*column, 8*column+8)] for column in range(12)]
active_column = 0
recovery_state = {'active_column': active_column}
def recover_state(file):
with open(file, "r") as read_content:
state = json.load(read_content)
return state
def write_state(file, state):
with open(file, "w") as write_file:
json.dump(state, write_file)
if '--error-recovery' in sys.argv:
recovered_state = recover_state('state.json')
active_column = recovered_state['active_column']
if __name__ == '__main__':
with HamiltonInterface(simulate=True) as ham_int:
normal_logging(ham_int, os.getcwd())
initialize(ham_int)
while active_column <= 12:
tip_pick_up(ham_int, tips_poss_cols[active_column])
aspirate(ham_int, aspiration_poss_cols[active_column], vols_list, liquidClass = liq_class)
dispense(ham_int, dispense_poss_cols[active_column], vols_list, liquidClass = liq_class)
tip_eject(ham_int, tips_poss_cols[active_column])
active_column += 1
recovery_state.update({'active_column': active_column})
write_state('state.json', recovery_state)