Async with DbSession¶
Up to this point we’ve used Connection directly — a ref object, non-sendable, synchronous. Fine for a script-shaped program. Awkward in a multi-actor program: only one actor can own the connection, and every call blocks its scheduler thread for the ODBC call’s duration.
DbSession is an actor that wraps a Connection and exposes its operations as behaviours. Each behaviour takes a Promise that’s fulfilled when the operation completes.
actor DbSession
new create(dsn: Dsn, validate_utf8: Bool = true)
be exec(sql: String val,
promise: Promise[(RowCount | ExecError)])
be query(sql: String val,
promise: Promise[(Array[Row val] val | ExecError)])
be begin(promise: Promise[(TxBegun | TxBeginError)])
be commit(promise: Promise[(TxCommitted | TxCommitError)])
be rollback(promise: Promise[(TxRolledBack | TxRollbackError)])
be close()
Shape follows Connection one-for-one, with two differences:
- Results come back through
Promises, not synchronous returns. queryreturns all rows as a sendableArray[Row val] valrather than a non-sendableCursor.
Cursor and Statement are ref — unsendable. So DbSession.query buffers the whole result set into memory. For very large result sets that’s the wrong trade; build an actor per query with the cursor private. For ordinary sets it’s simpler and good enough.
What DbSession doesn’t expose¶
prepare isn’t a behaviour, and neither is Statement: a Statement is tied to its Connection and can’t be sent out. If you need prepared statements inside an actor, build a custom actor that owns its own Connection and Statements privately.
A chained workflow¶
Promises compose. The sample does DROP → CREATE → INSERT → SELECT → DROP, each step triggered by the previous promise’s fulfillment:
use "lib:odbc"
use "odbc"
use "promises"
actor Main
let _env: Env
let _db: DbSession
new create(env: Env) =>
_env = env
let dsn_name =
try env.args(1)?
else "psqlred"
end
_db = DbSession(Dsn("DSN=" + dsn_name))
// Chain: CREATE -> INSERT -> SELECT -> DROP -> close.
let create_p = Promise[(RowCount | ExecError)]
_db.exec("DROP TABLE IF EXISTS tut_session", create_p)
let ct_p = Promise[(RowCount | ExecError)]
create_p.next[None](recover this~_after_drop(ct_p) end)
let ins_p = Promise[(RowCount | ExecError)]
ct_p.next[None](recover this~_after_create(ins_p) end)
let sel_p = Promise[(Array[Row val] val | ExecError)]
ins_p.next[None](recover this~_after_insert(sel_p) end)
sel_p.next[None](recover this~_after_select() end)
be _after_drop(next_p: Promise[(RowCount | ExecError)],
result: (RowCount | ExecError))
=>
_log_exec("drop", result)
_db.exec(
"CREATE TABLE tut_session (id INTEGER, label VARCHAR(32))",
next_p)
be _after_create(next_p: Promise[(RowCount | ExecError)],
result: (RowCount | ExecError))
=>
_log_exec("create", result)
_db.exec(
"INSERT INTO tut_session VALUES (1, 'alpha'), (2, 'bravo')",
next_p)
be _after_insert(
next_p: Promise[(Array[Row val] val | ExecError)],
result: (RowCount | ExecError))
=>
_log_exec("insert", result)
_db.query(
"SELECT id, label FROM tut_session ORDER BY id", next_p)
be _after_select(result: (Array[Row val] val | ExecError)) =>
match result
| let rows: Array[Row val] val =>
for row in rows.values() do
try
let id =
match \exhaustive\ row.int(ColIndex(1))?
| let v: I64 => v.string()
| SqlNull => "NULL"
end
let label =
match \exhaustive\ row.text(ColIndex(2))?
| let v: String val => v
| SqlNull => "NULL"
end
_env.out.print(id + " " + label)
end
end
| let e: ExecError =>
_env.err.print("select: " + e.string())
end
// Fire-and-forget cleanup; then shut the session.
let cleanup = Promise[(RowCount | ExecError)]
_db.exec("DROP TABLE IF EXISTS tut_session", cleanup)
cleanup.next[None](recover this~_after_cleanup() end)
be _after_cleanup(result: (RowCount | ExecError)) =>
_log_exec("cleanup", result)
_db.close()
fun _log_exec(label: String val, result: (RowCount | ExecError)) =>
match result
| let n: USize =>
_env.out.print(label + ": " + n.string() + " rows")
| NoRowCount =>
_env.out.print(label + ": (no row count)")
| let e: ExecError =>
_env.err.print(label + ": " + e.string())
end
./build/11-dbsession
drop: 0 rows
create: 0 rows
insert: 2 rows
1 alpha
2 bravo
cleanup: 0 rows
Why pass promises in, rather than returning them?¶
Behaviours take a Promise as a parameter rather than returning one. Two properties this buys you:
- Callers decide the promise type —
Promise[Result], orPromise[MyCustomType]populated via.next[MyCustomType]before the behaviour completes. - The handoff is visible:
let p = Promise[...]; db.exec(sql, p)reads more clearly than a chained fluent call.
Minor, but worth recognising when you chain behaviours.
Closing down cleanly¶
close() is a behaviour — queued after any outstanding work. Send it last and the connection shuts down once the final operation’s promise is fulfilled.
The sample’s _after_cleanup does exactly that: it fires the final close() after the last DROP’s promise resolves, so the session processes its queue in order and exits.