Implementing SUSI Linux App as a Finite State Machine

SUSI Linux app provides access to SUSI on Linux distributions on desktop as well as hardware devices like Raspberry Pi. It is a headless client that can be used to interact to SUSI via voice only. As more and more features like multiple hotword detection support and wake button support was added to SUSI Linux, the code became complex to understand and manage. A system was needed to model the app after. Finite State Machine is a perfect approach for such system.

The Wikipedia definition of State Machine is

It is an abstract machine that can be in exactly one of a finite number of states at any given time. The FSM can change from one state to another in response to some external inputs; the change from one state to another is called a transition.”

This means that if you can model your app into a finite number of states, you may consider using the State Machine implementation.

State Machine implementation has following advantages:

  • Better control over the working of the app.
  • Improved Error handling by making an Error State to handle errors.
  • States work independently which helps to modularize code in a better form.

To begin with, we declare an abstract State class. This class declares all the common properties of a state and transition method.

from abc import ABC, abstractclassmethod
import logging

class State(ABC):
   def __init__(self, components):
       self.components = components
       self.allowedStateTransitions = {}

   @abstractclassmethod
   def on_enter(self, payload=None):
       pass

   @abstractclassmethod
   def on_exit(self):
       pass

   def transition(self, state, payload=None):
       if not self.__can_transition(state):
           logging.warning("Invalid transition to State{0}".format(state))
           return

       self.on_exit()
       state.on_enter(payload)

   def __can_transition(self, state):
       return state in self.allowedStateTransitions.values()

We declared the on_enter() and on_exit() abstract method. These methods are executed on the entering and exiting a state respectively. The task designated for the state can be performed in the on_enter() method and it can free up resources or stop listening to callbacks in the on_exit() method. The transition method is to transition between one state to another. In a state machine, a state can transition to one of the allowed states only. Thus, we check if the transition is allowed or not before continuing it. The on_enter() and transition() methods additionally accepts a payload argument. This can be used to transfer some data to the state from the previous state.

We also added the components property to the State. Components store the shared components that can be used across all the State and are needed to be initialized only once. We create a component class declaring all the components that are needed to be used by states.

class Components:

   def __init__(self):
       recognizer = Recognizer()
       recognizer.dynamic_energy_threshold = False
       recognizer.energy_threshold = 1000
       self.recognizer = recognizer
       self.microphone = Microphone()
       self.susi = susi
       self.config = json_config.connect('config.json')

       if self.config['hotword_engine'] == 'Snowboy':
           from main.hotword_engine import SnowboyDetector
           self.hotword_detector = SnowboyDetector()
       else:
           from main.hotword_engine import PocketSphinxDetector
           self.hotword_detector = PocketSphinxDetector()

       if self.config['wake_button'] == 'enabled':
           if self.config['device'] == 'RaspberryPi':
               from ..hardware_components import RaspberryPiWakeButton
               self.wake_button = RaspberryPiWakeButton()
           else:
               self.wake_button = None
       else:
           self.wake_button = None

Now, we list out all the states that we need to implement in our app. This includes:

  • Idle State: App is listening for Hotword or Wake Button.
  • Recognizing State: App actively records audio from Microphone and performs Speech Recognition.
  • Busy State: SUSI API is called for the response of the query and the reply is spoken.
  • Error State: Upon any error in the above state, control transfers to Error State. This state needs to handle the speak the correct error message and then move the machine to Idle State.

Each state can be implemented by inheriting the base State class and implementing the on_enter() and on_exit() methods to implement the correct behavior.

We also declare a SUSI State Machine class to store the information about current state and declare the valid transitions for all the states.

class SusiStateMachine:
   def __init__(self):
       super().__init__()
       components = Components()
       self.__idle_state = IdleState(components)
       self.__recognizing_state = RecognizingState(components)
       self.__busy_state = BusyState(components)
       self.__error_state = ErrorState(components)
       self.current_state = self.__idle_state

       self.__idle_state.allowedStateTransitions = \
           {'recognizing': self.__recognizing_state, 'error': self.__error_state}
       self.__recognizing_state.allowedStateTransitions = \
           {'busy': self.__busy_state, 'error': self.__error_state}
       self.__busy_state.allowedStateTransitions = \
           {'idle': self.__idle_state, 'error': self.__error_state}
       self.__error_state.allowedStateTransitions = \
           {'idle': self.__idle_state}

       self.current_state.on_enter(payload=None)

We also set Idle State as the current State of the System. In this way, State Machine approach is implemented in SUSI Linux.

Resources:

 

Continue ReadingImplementing SUSI Linux App as a Finite State Machine