/**
* Hoctail client constructor options
* @typedef {object} ClientOptions
* @property {string} baseURL endpoint URL
* @property {string} [schema] schema ID, optional
* @property {string} [token] auth token, use API key instead, optional
* @property {string} [app] app name (format: 'ownerName/appName'), optional
* @property {string} [key] api KEY, required if no token supplied
* @property {string|number} [logLevel] minimal log level, default: 'LOG'
* @public
* */
/**
* Internal callback to use for query calls, async
* @callback QueryCallback
* @param {string} query - SQL query string
* @param {Array<*>} [params] - array of query parameters, optional
* @return {Promise<object[]>} result array
* @private
*/
/**
* User supplied callback to print logs
*
* Can use `console.log` in most cases
*
* It's a noop by default if not supplied (no logging)
* @callback LoggerFunction
* @param {...*} args - arguments to print out
* @return {void}
* @public
*/
/**
* Custom event message, is used in {@link EventCallback}
*
* Used internally
*
* Example query to get the event data:
* ```
* select *
* from "${schema}"."${relation}"
* where id = ANY(${ids});
* ```
*
* @typedef EventMessage
* @property {string} eventId event id, arbitrary tag
* @property {string} schema schema ID to select event from
* @property {string} relation table/view name to select event from
* @property {string[]} ids list of row IDs to select
* @protected
*/
/**
* Event reaction callback
* @callback EventCallback
* @param {EventMessage} message - event message
* @return {void}
* @protected
*/
/**
* Application context options
*
* Used as a pointer to a specific app context
* @typedef {Object} AppOptions
* @property {string} [id] - app id
* @property {string} [owner] - app owner
* @property {string} [name] - app name
* @property {boolean} [return=false] - set true if stx should return result
*/
/**
* Query id, a running number
*
* Identifies a query in the current session
* @private
* @var {number} query id
*/
let qid = 1
/**
* Serializes a function definition into string
*
* Example:
* ```
* createSrc((i) => { return i + 1 }, 41) =>
* '((i) => { return i + 1 })(41)'
* ```
* @private
* @param {function} func - function definition
* @param {...*} args - function arguments, serialized as JSON
* @return {string} serialized function
*/
function createSrc (func, ...args) {
const params = JSON.stringify(args).slice(1, -1)
return `(${func.toString()})(${params})`
}
/**
* @private
* @type {RegExp}
*/
const quoteCheck = /^"[^"';]+"$/
/**
* @private
* @type {Object<number, string>}
*/
const numCache = { 0: '' }
/**
* @private
* @param {number} len
* @return {string}
*/
function getNumSeq (len) {
let num = numCache[len]
if (num == null) {
num = Array(len).fill(undefined).map((_, i) => `$${i + 1}`).join(',')
numCache[len] = num
}
return num
}
const LOG = {
10: 'DEBUG5',
11: 'DEBUG4',
12: 'DEBUG3',
13: 'DEBUG2',
14: 'DEBUG1',
15: 'LOG',
17: 'INFO',
18: 'NOTICE',
19: 'WARNING',
20: 'ERROR',
DEBUG5: 10,
DEBUG4: 11,
DEBUG3: 12,
DEBUG2: 13,
DEBUG1: 14,
LOG: 15,
INFO: 17,
NOTICE: 18,
WARNING: 19,
ERROR: 20,
}
/**
* Normalizes remote procedure name into a canonical string
*
* And adds the arguments placeholders
*
* Example:
* ```
* func => "func"
* app.func1 => "app"."func1"
* "my app".func => "my app"."func"
* ```
*
* @private
* @param {string} rpcName - remote procedure name
* @param {number} argsLen - number of arguments
* @return {string} resulting name, format: "schema"."name"($1,$2,$3,...)
* @throws {Error} on wrong rpc name format
*/
function normalizeEndpoint (rpcName, argsLen) {
const error = () => new Error(`Endpoint name error: '${rpcName}'
Endpoint name should have one of the following formats:
- endpoint
- "endpoint name"
- app.endpoint
- "app name"."endpoint name"
`)
if (typeof rpcName !== 'string') {
throw error()
}
let [schema, name] = rpcName.split('.').map(element => {
if (!quoteCheck.exec(element)) {
element = `"${element}"`
if (!quoteCheck.exec(element)) {
throw error()
}
}
return element
})
const num = getNumSeq(argsLen)
return `${name ? `${schema}.${name}` : schema}(${num})`
}
/**
* Transaction wrapper class, a public API for client transaction
* @public
* @hideconstructor
*/
class Tx {
/**
* Creates transaction, do not call it directly, use {@link HClient#tx}
* @param {QueryCallback} cQuery - query executor function
* @param {string=} [tid] - transaction id, optional
*/
constructor (cQuery, tid) {
/**
* @private
* @type {QueryCallback}
*/
this._queryFunc = cQuery
/**
* Transaction id
* @type {string|null}
* @readonly
*/
this.tid = tid
}
/**
* Run SQL query (in transaction block)
* @param {string} query - SQL query string
* @param {Array<*>} [params] - array of query parameters
* @return {Promise<object[]>} array of results
* @public
*/
async query (query, params) {
return this._queryFunc(query, params)
}
/**
* Commit the transaction, closes the transaction block
* @return {Promise<void>}
* @public
*/
async commit () {
await this._queryFunc('commit', [])
}
/**
* Rollback the transaction, closes the transaction block
* @return {Promise<void>}
* @public
*/
async rollback () {
await this._queryFunc('rollback', [])
}
/**
* Run function in a server context (in transaction block)
*
* Sync function only, use {@link HClient#wait} to wait for async function
*
* __Note: return value will be ignored!__
* @param {function} func - arbitrary function
* @param {...*} args - function arguments, will be JSON serialized
* @return {Promise<void>}
* @public
*/
async run (func, ...args) {
await this._queryFunc('hoc run', [createSrc(func, ...args)])
}
/**
* Call a remote procedure (in transaction block)
*
* Sync function only, use {@link HClient#wait} to wait for async function
* @param {string} endpoint - procedure name
* @param {...*} args - array of arguments, will be JSON serialized
* @return {Promise<*>} returns procedure results if any
* @public
*/
async call (endpoint, ...args) {
const rows = await this._queryFunc(`select ${normalizeEndpoint(endpoint, args.length)} as value`, args)
return rows.length > 0 ? rows[0].value : null
}
}
/**
* Hoctail query client public API. It's a base class and not intended for direct use.
* Check inherited instead:
* {@link Client}
*/
class HClient {
/**
* Creates a new client. This is a base class, see
*
* Typical usage:
* `new HClient({ baseURL, key }, console.log)`
* @constructor
* @param {ClientOptions} options - config options
* @param {LoggerFunction} [logger] - logger function, optional, noop by default
*/
constructor (options, logger) {
/**
* Base endpoint URL
* @type {string}
* @protected
*/
this.baseURL = options.baseURL
/**
* Logger function
* @type {LoggerFunction|null}
*/
this.logger = logger
/**
* WebSocket instance
* @type {*}
* @protected
*/
this.ws = null
/**
* Schema ID
* @type {string|null}
* @protected
*/
this.schema = options.schema
/**
* Auth token
* @type {string|null}
* @protected
*/
this.token = options.token
/**
* App name
* @type {string|null}
* @protected
*/
this._app = options.app
/**
* Event message user callback
* @type {EventCallback|null}
* @private
*/
this._eventCallback = null
if (!this.token && options.key) {
this.token = `HOC-API ${options.key}`
}
/**
* Alias for {@link HClient#query}
* @type {function(string, Array<*>, boolean): Promise<Array<Object>>}
*/
this.q = this.query
/**
* Default log level to use for the logger
* @type {number}
*/
this.logLevel = parseLogLevel(options.logLevel)
this._logsQueue = new Map()
this._terminating = false
this._connecting = false
}
/**
* Get a slot from queue
* @param {number} qid
* @param {number} priority
* @protected
* @returns {Promise<void>}
*/
async _slotGet (qid, priority) {
throw new Error('Not implemented')
}
/**
* Return a slot to queue
* @param {number} qid
* @protected
* @returns {void}
*/
_slotFree (qid) {
throw new Error('Not implemented')
}
/**
* Flush all the queue slots
* @protected
* @returns {Promise<void>}
*/
_flushQueue () {
throw new Error('Not implemented')
}
/**
* Start the client heartbeat
* @protected
*/
heartbeat () {
function hb (client, callback) {
clearTimeout(client.ws.pingTimeout)
client.ws.pingTimeout = setTimeout(() => {
client.terminate(1000, `Timed out`)
callback(new Error(`Timed out`))
}, 30000 + 1000)
}
this.ws.on('ping', () => {
hb(this, (e) => {
if (e) {
console.log(e.message)
}
})
})
}
/**
* Create websocket and set up authentication
* @protected
*/
createSocket () {
throw new Error('Not implemented')
}
/**
* Check if connection is closed
* @type {boolean}
* @protected
*/
get closed () {
return this.ws == null
}
/**
* Decode msgpack event data and cache decoded event
* @param {MessageEvent} event
* @return {object}
* @protected
*/
decode (event) {
throw new Error('Not implemented')
}
/**
* Encode message as msgpack buffer
* @param {object} obj
* @return {Uint8Array}
* @protected
*/
encode (obj) {
throw new Error('Not implemented')
}
/**
* Get connection endpoint path (href)
* @return {string}
* @protected
*/
getEndpoint () {
return new URL((this.app || '') + '/', this.baseURL).href
}
/**
* @return {Promise<void>}
* @private
*/
async _wsConnect () {
return new Promise((resolve, reject) => {
this.createSocket()
this.ws.binaryType = 'arraybuffer'
const onMsg = messageHandler.bind(this)
function onClose () {
clearTimeout(this.pingTimeout)
}
const onError = (e) => {
this.terminate(1000, e.message)
reject(e)
}
this.ws.addEventListener('open', () => {
this.heartbeat()
// temp remove error listener, will be re-added later in `connect()`
this.ws.removeEventListener('error', onError)
resolve()
})
this.ws.addEventListener('close', onClose)
this.ws.addEventListener('error', onError)
this.ws.addEventListener('message', onMsg)
})
}
/**
* @return {Promise<void>}
* @private
*/
async _connect () {
// wait until state is CLOSED or OPEN
while (this._terminating || this._connecting) {
await new Promise((resolve) => {
setTimeout(resolve, 100)
})
}
if (this.closed) {
this._connecting = true
const sleep = 3000
for (let attempts = 3; attempts > 0; attempts--) {
try {
await this._wsConnect()
this.ws.addEventListener('error', (e) => {
this.terminate(1000, e.message)
throw new Error('WebSocket closed abnormally')
})
this._connecting = false
return
} catch (e) {
// in node `ws` we do have access to client errors
if (e.target && e.target._req && e.target._req.res) {
const code = e.target._req.res.statusCode
const msg = e.target._req.res.statusMessage
if (code >= 400 && code < 500) {
// we try to create a better error here
throw new Error(`Failed to connect, client error or permission denied. HTTP code: ${code} ${msg}`)
}
}
console.error(e.message)
if (attempts > 1) {
await new Promise((resolve) => {
console.error(`Sleeping between reconnect attempts...`)
setTimeout(resolve, sleep)
})
}
}
}
this._connecting = false
throw new Error(`Failed to reconnect`)
}
}
/**
* Explicit connect to server, optional
*
* Lazy (re)connection is used in all of the query functions
* @return {Promise<void>}
* @public
*/
async connect () {
return this._connect()
}
/**
* Create an implicit transaction block
* @param {boolean} [wait=false] - if true should wait for async call to complete
* @return {Promise<Tx>} resolves to a transaction instance
* @private
*/
async _newTx (wait) {
await this._connect()
return new Tx(async (query, params) =>
(await _wsQuery(this, query, params, undefined, wait)).rows)
}
/**
* Run SQL query within an implicit transaction
* @param {string} query - SQL query string
* @param {Array<*>} [params] - {@link Array} of query parameters
* @param {boolean} [wait] - will wait for any server side {@link Promise}s to resolve
* @return {Promise<object[]>} {@link Array} of result objects
* @public
*/
async query (query, params, wait) {
const tx = await this._newTx(wait)
return tx.query(query, params)
}
/**
* Run a custom function in a server context
*
* __Note: return value will be ignored!__
* @example
* let res = await client.run(() => {
* console.log('will be logged')
* return 42
* })
* // `res` will be `null`
*
* // compare to `wait()`
* res = await client.wait(() => {
* console.log('will be logged')
* return 42
* })
* // `res` will be `42`
*
* @param {function} func - function definition
* @param {...*} args - function arguments
* @return {Promise<void>} doesn't return anything
* @public
*/
async run (func, ...args) {
const tx = await this._newTx()
await tx.run(func, ...args)
}
/**
* Call a remote procedure
*
* Returns result, if synchronous
* @param {string} endpoint - remote procedure name
* @param {...*} args - remote procedure arguments
* @return {Promise<*>} result, if any
* @public
*/
async call (endpoint, ...args) {
const tx = await this._newTx()
return tx.call(endpoint, ...args)
}
/**
* Call an async remote procedure or execute a function and wait for the result
*
* The only way to `await` for async functions/procedures
*
* __Note: will open multiple transactions! (after each await/Promise)__
* @param {function|string} func - inline function or remote procedure name
* @param {...*} args - function or procedure arguments
* @return {Promise<*>} result, if any
* @public
* @example
* let res = await hoctail.wait(() => { return BigInt(Number.MAX_SAFE_INTEGER + 1) })
* // res = 9007199254740992n
* res = await hoctail.wait(async () => {
* // first transaction
* let sum = await Promise.resolve(2)
* // second transaction (after await)
* sum += await Promise.resolve(3)
* // third transaction (after another await)
* return sum
* })
* // res = 5
*/
async wait (func, ...args) {
const tx = await this._newTx(true)
if (typeof func === 'string') {
return tx.call(func, ...args)
} else if (typeof func === 'function') {
const rows = await tx.query('hoc await', [createSrc(func, ...args)])
return rows.length > 0 ? rows[0].value : null
} else {
throw new TypeError(`Unknown parameter type, should be string or function`)
}
}
/**
* Create transaction and optionally execute a local sync function in the transaction block context
* @example
* // create transaction and execute queries
* // (runs locally, sends queries to server and reads in the results)
* await client.tx(async (tx) => {
* const rows = await tx.query(`select * from table`)
* for (let row of rows) {
* await tx.query(`insert into table2 values ($1, $2)`, [row.col1, row.col2])
* }
* return rows
* })
* // the above is equivalent to:
* // (runs remotely, compiles and executes in one go)
* await client.wait(() => {
* const rows = hoc.sql(`select * from table`)
* for (let row of rows) {
* hoc.sql(`insert into table2 values ($1, $2)`, [row.col1, row.col2])
* }
* return rows
* })
* // `client.tx()` will be much slower than `client.wait()`
*
* // create and use an explicit transaction
* const tx = await client.tx()
* await tx.query(`insert into table values($1, $2)`, [1, 'a'])
* // ....
* // you cannot hold here too long, server will kill long transactions
* await tx.commit()
* @param {function} [func] - function to execute locally in the transaction block, optional
* @return {Promise<Tx|*>} transaction class or result of executing the function
* @public
*/
async tx (func) {
await this._connect()
const { tid } = await _wsQuery(this, 'begin')
const tx = new Tx(async (query, params) => (await _wsQuery(this, query, params, tid)).rows, tid)
if (func == null) {
return tx
}
let res
try {
res = await func(tx)
} catch (e) {
await tx.rollback()
throw e
}
await tx.commit()
return res
}
/**
* Get the current user object
*
* Only `username` and `email` properties of the object are always returned.
* Other props are optional, (`password` is always hidden)
*
* @example
* {
* username: 'johndoe',
* firstname: 'John',
* lastname: 'Doe',
* email: 'jdoe@hoctail.io',
* picture: {},
* password: '********'
* }
* @return {Promise<Object<string, any>>}
* @public
*/
async user () {
return this.call('public.whoami')
}
/**
* The app name getter/setter
* @type {string}
* @public
*/
get app () {
return this._app
}
set app (arg) {
this._app = arg
}
/**
* Terminate the websocket
*
* Use `close()` instead
* @param {number} [code] - websocket exit code
* @param {string} [reason] - websocket connection close reason
* @public
*/
async terminate (code, reason) {
if (!this._terminating && !this.closed) {
this._terminating = true
await Promise.allSettled(this._logsQueue.values())
this._logsQueue.clear()
await this._flushQueue()
this.ws.close(code, reason)
if (typeof this.ws.terminate === 'function') {
this.ws.terminate()
}
this.ws = null
this._terminating = false
}
}
/**
* Add a custom event listener for server events
* @param {EventCallback} callback
* @protected
*/
eventListener (callback) {
this._eventCallback = callback
}
/**
* Restart the app, will re-require all the modules
* @public
*/
async restartApp () {
const tx = await this._newTx()
await tx.query('HOC RESTART')
}
/**
* Get the current env variables from server
* @public
*/
async getEnv () {
return this.wait(() => process.env)
}
/**
* Set env vars, values will be stringified and keys uppercased
*
* Will be merged with the current env
*
* __Note: you may need to run `restartApp()` to read in the changes (re-read `process.env`)__
* @param {Object<string, any>} env
* @public
*/
async setEnv (env) {
await this.run(env => {
process.env = Object.assign({}, process.env, env)
}, env)
}
/**
* Remove (undef) an env variable
*
* __Note: you may need to run `restartApp()` to read in the changes (re-read `process.env`)__
* @param {string} name
* @public
*/
async delEnv (name) {
await this.run(name => {
delete process.env[name]
}, name)
}
/**
* Execute code in a server-side transaction
*
* @example
* // Create a new table
* client.stx(store => {
* store.system.schema.addTable('My Table')
* })
* // Add a table to another app
* client.stx(store => {
* store.system.schema.addTable('Some Table')
* }, { owner: 'username', name: 'app' })
* @param {Function} func - executed function
* @param {AppOptions} [options] - app transaction context options
* @returns {Promise<void>}
* @public
*/
async stx (func, options) {
let waitRes
if (typeof options === 'object' && 'return' in options) {
waitRes = !!options.return
delete options.return
if (!Object.keys(options).length) options = undefined
}
const tx = await this._newTx(waitRes)
const res = await tx._queryFunc(`hoc ${waitRes ? 'await' : 'run'}`, [`
(options => {
const { serverSideTx } = require('@hoctail/patch-interface')
return serverSideTx(hoc, ${func.toString()}, options)
})(${JSON.stringify(options)})
`])
return res.length > 0 ? res[0].value : null
}
/**
* Execute code in a server-side transaction and return result
* from server to client. Can be slower than `stx`.
*
* @example
* // Create a new table
* client.stx(store => {
* store.system.schema.addTable('My Table')
* })
* // Add a table to another app
* client.stx(store => {
* store.system.schema.addTable('Some Table')
* }, { owner: 'username', name: 'app' })
* @param {Function} func - executed function returning result
* @param {AppOptions} [options] - app transaction context options
* @returns {Promise<void>}
* @public
*/
async stxRet (func, options) {
if (typeof options === 'object') options.return = true
else options = { return: true }
return this.stx(func, options)
}
}
/**
* @typedef QueryObject
* @property {string} q SQL query string
* @property {object|Array<*>|string|null} params SQL query params
* @private
*/
/**
* Parse named params object and return a canonical query object with an array of params
*
* Example:
* ```
* {
* q: "select * from t where id = ${myID} and name = ${myName}",
* params: { myId: 1, myName: "test" }
* }
* ```
* is converted to:
* ```
* {
* q: "select * from t where id = $1 and name = $2",
* params: [1, "test"]
* }
* ```
* @param {QueryObject} queryObj
* @return {QueryObject}
* @private
*/
function parse_named (queryObj) {
const params_arr = []
let i = 0
if (queryObj.params == null || typeof queryObj.params === 'string' || Array.isArray(queryObj.params))
return queryObj
const query = queryObj.q.replace(/\${(\w+)}/g,
(match, p1) => {
params_arr[i++] = queryObj.params[p1]
return `$${i}`
})
if (params_arr.length > 0) {
queryObj.q = query
queryObj.params = params_arr
}
return queryObj
}
/**
* Construct {@link Error} object from a serialized error string
* @param {string} error - error string, JSON serialized
* @return {Error} resulting {@link Error} object
* @private
*/
function emitError (error) {
const errObj = JSON.parse(error)
const err = new Error(errObj.message)
const stack = err.stack
Object.assign(err, errObj)
if (err.stack === stack) {
// remove stacktrace generated by error constructor above
err.stack = stack.slice(0, stack.indexOf('\n'))
}
return err
}
/**
* Main query runner
* @param {HClient} client - client instance
* @param {string} query - SQL query
* @param {Array<*>} [params] - SQL query params, optional
* @param {string|null} [tid] - transaction ID, optional
* @param {boolean} [wait] - wait flag, optional, will wait for {@link Promise} resolution if true
* @return {Promise<object>} resulting rows {@link Object}
* @private
*/
async function _wsQuery (client, query, params, tid, wait = false) {
const queryObj = parse_named({ q: String(query), params: params })
params = queryObj.params
query = queryObj.q
if (params != null) {
params = (Array.isArray(params)) ? params : [params]
}
const id = qid++
await client._slotGet(id, 0)
return new Promise((resolve, reject) => {
const msg = {
q: query,
params,
tid,
qid: id,
schema: client.schema,
appname: client.app,
await: wait,
}
const timeout = setTimeout(() => {
reject(new Error(`Timed out waiting for: ${JSON.stringify(msg)}`))
}, 30000)
let onClose
const onMessage = function (event) {
const message = client.decode(event)
if (message.type === 'result' && message.id === id) {
try {
if (message.error) {
return reject(emitError(message.error))
}
resolve(message.msg)
} finally {
client.ws.removeEventListener('message', onMessage)
client.ws.removeEventListener('close', onClose)
clearTimeout(timeout)
client._slotFree(id)
}
}
}
onClose = () => {
try {
if (client.ws) {
client.ws.removeEventListener('message', onMessage)
}
} finally {
clearTimeout(timeout)
}
}
client.ws.addEventListener('message', onMessage)
client.ws.addEventListener('close', onClose)
try {
client.ws.send(client.encode(msg))
} catch (e) {
reject(e)
}
})
}
/**
* Root message handler
* @callback
* @param {MessageEvent} event - event to handle
* @return {void}
* @private
*/
async function messageHandler (event) {
const message = this.decode(event)
switch (message.type) {
case 'log':
const msg = message.msg
if (this.logger && msg.severity >= this.logLevel) {
if (msg.text) {
this.logger(msg.text)
return
}
const logQuery = async () => {
try {
const rows = await this.query(`select * from "${msg.schema}".logs where id = $1`, [msg.id])
if (rows.length > 0) {
const row = rows[0]
this.logger(row.message)
}
} catch (e) {
this.logger(e)
}
}
const promise = logQuery()
this._logsQueue.set(logQuery, promise)
await promise
this._logsQueue.delete(logQuery)
}
break
case 'event':
if (typeof this._eventCallback === 'function') {
this._eventCallback(message.msg)
}
}
}
/**
* Parse logLevel severity and return a numeric value, will fail on unknown level
* @param {string|number|null} [logLevel] - log level severity to parse
* @return {number} numeric severity level
*/
function parseLogLevel (logLevel) {
let outLevel = logLevel
if (logLevel == null) {
outLevel = LOG.LOG
} else if (typeof logLevel === 'string') {
outLevel = LOG[logLevel.toUpperCase()]
} else if (typeof logLevel === 'number' && LOG[logLevel] == null) {
outLevel = null
}
if (outLevel == null) {
throw new Error(`Unknown log severity level: ${logLevel}`)
}
return outLevel
}
HClient.Tx = Tx
HClient.default = HClient
HClient.LOG = LOG
HClient.paseLogLevel = parseLogLevel
module.exports = HClient