Python – SQLAlchethe session management in long-running process

pythonsqlalchemywonderware

Scenario:

  • A .NET-based application server (Wonderware IAS/System Platform) hosts automation objects that communicate with various equipment on the factory floor.
  • CPython is hosted inside this application server (using Python for .NET).
  • The automation objects have scripting functionality built-in (using a custom, .NET-based language). These scripts call Python functions.

The Python functions are part of a system to track Work-In-Progress on the factory floor. The purpose of the system is to track the produced widgets along the process, ensure that the widgets go through the process in the correct order, and check that certain conditions are met along the process. The widget production history and widget state is stored in a relational database, this is where SQLAlchemy plays its part.

For example, when a widget passes a scanner, the automation software triggers the following script (written in the application server's custom scripting language):

' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
    ProductionLine.Running = False;
    InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
    InformationBoard.SoundAlarm = True
end if;

The script calls the WidgetScanned python function:

# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime

def WidgetScanned(widget_id, scanner_id):
    widget = session.query(Widget).get(widget_id)
    validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error

    widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
    widget.last_scanner = scanner_id
    widget.last_update = datetime.now()

    return StatusMessage("OK")

# ... there are a dozen similar functions

My question is: How do I best manage SQLAlchemy sessions in this scenario? The application server is a long-running process, typically running months between restarts. The application server is single-threaded.

Currently, I do it the following way:

I apply a decorator to the functions I make avaliable to the application server:

# pywip/iasfunctions.py
from pywip import functions

def ias_session_handling(func):
    def _ias_session_handling(*args, **kwargs):
        try:
            retval = func(*args, **kwargs)
            session.commit()
            return retval
        except:
            session.rollback()
            raise
    return _ias_session_handling

# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)

Question: Is the decorator above suitable for handling sessions in a long-running process? Should I call session.remove()?

The SQLAlchemy session object is a scoped session:

# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker

session = scoped_session(sessionmaker())

I want to keep the session management out of the basic functions. For two reasons:

  1. There is another family of functions, sequence functions. The sequence functions call several of the basic functions. One sequence function should equal one database transaction.
  2. I need to be able to use the library from other environments. a) From a TurboGears web application. In that case, session management is done by TurboGears. b) From an IPython shell. In that case, commit/rollback will be explicit.

(I am truly sorry for the long question. But I felt I needed to explain the scenario. Perhaps not necessary?)

Best Answer

The described decorator is suitable for long running applications, but you can run into trouble if you accidentally share objects between requests. To make the errors appear earlier and not corrupt anything it is better to discard the session with session.remove().

try:
    try:
        retval = func(*args, **kwargs)
        session.commit()
        return retval
    except:
        session.rollback()
        raise
finally:
    session.remove()

Or if you can use the with context manager:

try:
    with session.registry().transaction:
        return func(*args, **kwargs)
finally:
    session.remove()

By the way, you might want to use .with_lockmode('update') on the query so your validate doesn't run on stale data.