Skip to content

Async with DbSession

Up to this point we’ve used Connection directly — a ref object, non-sendable, synchronous. Fine for a script-shaped program. Awkward in a multi-actor program: only one actor can own the connection, and every call blocks its scheduler thread for the ODBC call’s duration.

DbSession is an actor that wraps a Connection and exposes its operations as behaviours. Each behaviour takes a Promise that’s fulfilled when the operation completes.

actor DbSession
  new create(dsn: Dsn, validate_utf8: Bool = true)

  be exec(sql: String val,
    promise: Promise[(RowCount | ExecError)])

  be query(sql: String val,
    promise: Promise[(Array[Row val] val | ExecError)])

  be begin(promise: Promise[(TxBegun | TxBeginError)])
  be commit(promise: Promise[(TxCommitted | TxCommitError)])
  be rollback(promise: Promise[(TxRolledBack | TxRollbackError)])

  be close()

Shape follows Connection one-for-one, with two differences:

  • Results come back through Promises, not synchronous returns.
  • query returns all rows as a sendable Array[Row val] val rather than a non-sendable Cursor.

Cursor and Statement are ref — unsendable. So DbSession.query buffers the whole result set into memory. For very large result sets that’s the wrong trade; build an actor per query with the cursor private. For ordinary sets it’s simpler and good enough.

What DbSession doesn’t expose

prepare isn’t a behaviour, and neither is Statement: a Statement is tied to its Connection and can’t be sent out. If you need prepared statements inside an actor, build a custom actor that owns its own Connection and Statements privately.

A chained workflow

Promises compose. The sample does DROP → CREATE → INSERT → SELECT → DROP, each step triggered by the previous promise’s fulfillment:

use "lib:odbc"
use "odbc"
use "promises"

actor Main
  let _env: Env
  let _db: DbSession

  new create(env: Env) =>
    _env = env

    let dsn_name =
      try env.args(1)?
      else "psqlred"
      end

    _db = DbSession(Dsn("DSN=" + dsn_name))

    // Chain: CREATE -> INSERT -> SELECT -> DROP -> close.
    let create_p = Promise[(RowCount | ExecError)]
    _db.exec("DROP TABLE IF EXISTS tut_session", create_p)

    let ct_p = Promise[(RowCount | ExecError)]
    create_p.next[None](recover this~_after_drop(ct_p) end)

    let ins_p = Promise[(RowCount | ExecError)]
    ct_p.next[None](recover this~_after_create(ins_p) end)

    let sel_p = Promise[(Array[Row val] val | ExecError)]
    ins_p.next[None](recover this~_after_insert(sel_p) end)

    sel_p.next[None](recover this~_after_select() end)

  be _after_drop(next_p: Promise[(RowCount | ExecError)],
    result: (RowCount | ExecError))
  =>
    _log_exec("drop", result)
    _db.exec(
      "CREATE TABLE tut_session (id INTEGER, label VARCHAR(32))",
      next_p)

  be _after_create(next_p: Promise[(RowCount | ExecError)],
    result: (RowCount | ExecError))
  =>
    _log_exec("create", result)
    _db.exec(
      "INSERT INTO tut_session VALUES (1, 'alpha'), (2, 'bravo')",
      next_p)

  be _after_insert(
    next_p: Promise[(Array[Row val] val | ExecError)],
    result: (RowCount | ExecError))
  =>
    _log_exec("insert", result)
    _db.query(
      "SELECT id, label FROM tut_session ORDER BY id", next_p)

  be _after_select(result: (Array[Row val] val | ExecError)) =>
    match result
    | let rows: Array[Row val] val =>
      for row in rows.values() do
        try
          let id =
            match \exhaustive\ row.int(ColIndex(1))?
            | let v: I64 => v.string()
            | SqlNull => "NULL"
            end
          let label =
            match \exhaustive\ row.text(ColIndex(2))?
            | let v: String val => v
            | SqlNull => "NULL"
            end
          _env.out.print(id + " " + label)
        end
      end
    | let e: ExecError =>
      _env.err.print("select: " + e.string())
    end

    // Fire-and-forget cleanup; then shut the session.
    let cleanup = Promise[(RowCount | ExecError)]
    _db.exec("DROP TABLE IF EXISTS tut_session", cleanup)
    cleanup.next[None](recover this~_after_cleanup() end)

  be _after_cleanup(result: (RowCount | ExecError)) =>
    _log_exec("cleanup", result)
    _db.close()

  fun _log_exec(label: String val, result: (RowCount | ExecError)) =>
    match result
    | let n: USize =>
      _env.out.print(label + ": " + n.string() + " rows")
    | NoRowCount =>
      _env.out.print(label + ": (no row count)")
    | let e: ExecError =>
      _env.err.print(label + ": " + e.string())
    end
./build/11-dbsession
drop: 0 rows
create: 0 rows
insert: 2 rows
1 alpha
2 bravo
cleanup: 0 rows

Why pass promises in, rather than returning them?

Behaviours take a Promise as a parameter rather than returning one. Two properties this buys you:

  • Callers decide the promise type — Promise[Result], or Promise[MyCustomType] populated via .next[MyCustomType] before the behaviour completes.
  • The handoff is visible: let p = Promise[...]; db.exec(sql, p) reads more clearly than a chained fluent call.

Minor, but worth recognising when you chain behaviours.

Closing down cleanly

close() is a behaviour — queued after any outstanding work. Send it last and the connection shuts down once the final operation’s promise is fulfilled.

The sample’s _after_cleanup does exactly that: it fires the final close() after the last DROP’s promise resolves, so the session processes its queue in order and exits.