pre-initialization queue 预初始化队列
处理数据库初始化之前的请求
0.5 秒后, 数据库连接成功, 在这之前, 将请求的任务
放入任务队列
中, 等到数据库连接成功, 执行任务队列
中的所有任务
import { EventEmitter } from 'events'
class DB extends EventEmitter {
connected = false
commandsQueue = []
async query(queryString) {
// 数据库查询的时候, 发现未连接数据库, 则将请求封装进一个函数, 并 push 进 queue 中
if (!this.connected) {
console.log(`Request queued: ${queryString}`)
// commmand 命令模式: 通过闭包方式存储之后需要运行的信息
const command = () => {
this.query(queryString)
}
this.commandsQueue.push(command)
return
}
console.log(`Query executed: ${queryString}`)
}
connect() {
// 模拟建立网络连接时的延迟
setTimeout(() => {
this.connected = true
// 连接成功后, 处理全部之前未处理的请求
this.commandsQueue.forEach(command => command())
this.commandsQueue = []
}, 500)
}
}
export const db = new DB()
import { db } from './db.js'
db.connect()
async function updateLastAccess() {
await db.query(`INSERT (${Date.now()}) INTO "LastAccesses"`)
}
updateLastAccess()
process.nextTick(() => {
console.log('-----------------------')
})
setTimeout(() => {
updateLastAccess()
}, 600)
State 状态模式
将数据库中的 query 代码实现抽取出来, 以提升其模块程度
import { EventEmitter } from 'events'
const METHODS_REQUIRING_CONNECTION = ['query']
const deactivate = Symbol('deactivate')
class InitializedState {
async query(queryString) {
console.log(`Query executed: ${queryString}`)
}
}
class QueuingState {
constructor(db) {
this.db = db
this.commandsQueue = []
// 当进行某项操作, 如 query 时
METHODS_REQUIRING_CONNECTION.forEach(methodName => {
this[methodName] = function (...args) {
console.log('Command queued:', methodName, args)
// 将操作及参数通过闭包的方式存入一个函数
const command = () => {
db[methodName](...args)
}
this.commandsQueue.push(command)
return
}
})
}
[deactivate]() {
this.commandsQueue.forEach(command => command())
this.commandsQueue = []
}
}
class DB extends EventEmitter {
constructor() {
super()
// 初始状态: 未初始化完成的状态
this.state = new QueuingState(this)
}
// 不同状态, 不同的查询方式
async query(queryString) {
return this.state.query(queryString)
}
connect() {
setTimeout(() => {
this.connected = true
const oldState = this.state
// 连接成功时手动修改状态
this.state = new InitializedState(this)
oldState[deactivate] && oldState[deactivate]()
}, 500)
}
}
export const db = new DB()
批处理相同异步请求
实现方式: 将请求/结果
存入一个 Map 中, 如果遇到相同的请求, 并且该请求还未完成, 就直接返回相同的请求/结果
处理请求的服务器
import { createServer } from 'http'
// import { totalSales } from './totalSales.js'
import { totalSales } from './totalSalesBatch.js'
// import { totalSales } from './totalSalesCache.js'
createServer(async (req, res) => {
const url = new URL(req.url, 'http://localhost')
const product = url.searchParams.get('product')
console.log(`Processing query: ${url.search}`)
const sum = await totalSales(product)
res.setHeader('Content-Type', 'application/json')
res.writeHead(200)
res.end(JSON.stringify({
product,
sum
}))
}).listen(8000, () => console.log('Server started'))
未批量处理请求
import level from 'level'
import sublevel from 'subleveldown'
const db = level('example-db')
const salesDb = sublevel(db, 'sales', { valueEncoding: 'json' })
export async function totalSales (product) {
const now = Date.now()
let sum = 0
for await (const transaction of salesDb.createValueStream()) {
if (!product || transaction.product === product) {
sum += transaction.amount
}
}
console.log(`totalSales() took: ${Date.now() - now}ms`)
return sum
}
十万数据模拟
import level from 'level'
import sublevel from 'subleveldown'
import nanoid from 'nanoid'
const db = level('example-db')
const salesDb = sublevel(db, 'sales', { valueEncoding: 'json' })
const products = ['book', 'game', 'app', 'song', 'movie']
async function populate () {
for (let i = 0; i < 100000; i++) {
await salesDb.put(nanoid(), {
amount: Math.ceil(Math.random() * 100),
product: products[Math.floor(Math.random() * 5)]
})
}
console.log('DB populated')
}
populate()
二十次请求模拟
import superagent from 'superagent'
const start = Date.now()
let count = 20
let pending = count
const interval = 200
const query = process.argv[2] ? process.argv[2] : 'product=book'
function sendRequest () {
superagent.get(`http://localhost:8000?${query}`)
.then(result => {
console.log(result.status, result.body)
if (!--pending) {
console.log(`All completed in: ${Date.now() - start}ms`)
}
})
if (--count) {
setTimeout(sendRequest, interval)
}
}
sendRequest()
缓存请求
将请求结果缓存, 如果遇到相同的请求, 并且该请求还未完成, 就直接返回相同的请求
import { totalSales as totalSalesRaw } from './totalSales.js'
const runningRequests = new Map()
export function totalSales (product) {
if (runningRequests.has(product)) {
console.log('Batching')
return runningRequests.get(product)
}
const resultPromise = totalSalesRaw(product)
runningRequests.set(product, resultPromise)
// 请求处理完毕, 将其从映射表中删除
resultPromise.finally(() => {
runningRequests.delete(product)
})
return resultPromise
}
缓存结果
请求执行完毕, 也不急着把相关的 Promise 从映射表中删除
import { totalSales as totalSalesRaw } from './totalSales.js'
const CACHE_TTL = 30 * 1000 // 30 seconds TTL
const cache = new Map()
export function totalSales (product) {
if (cache.has(product)) {
console.log('Cache hit')
return cache.get(product)
}
const resultPromise = totalSalesRaw(product)
cache.set(product, resultPromise)
resultPromise.then(() => {
setTimeout(() => {
cache.delete(product)
}, CACHE_TTL)
}, err => {
cache.delete(product)
throw err
})
return resultPromise
}
异步取消
出错取消
手动制造错误 => 执行 catch
import { asyncRoutine } from './asyncRoutine.js'
import { CancelError } from './cancelError.js'
async function cancelable (cancelObj) {
const resA = await asyncRoutine('A')
console.log(resA)
if (cancelObj.cancelRequested) {
throw new CancelError()
}
const resB = await asyncRoutine('B')
console.log(resB)
if (cancelObj.cancelRequested) {
throw new CancelError()
}
const resC = await asyncRoutine('C')
console.log(resC)
}
const cancelObj = { cancelRequested: false }
cancelable(cancelObj)
.catch(err => {
if (err instanceof CancelError) {
console.log('Function canceled')
} else {
console.error(err)
}
})
setTimeout(() => {
cancelObj.cancelRequested = true
}, 100)
缺点: 执行完成后, 还要判断是否需要退出, 存在代码冗余
export class CancelError extends Error {
constructor () {
super('Canceled')
this.isCanceled = true
}
}
export function asyncRoutine (label) {
console.log(`Starting async routine ${label}`)
return new Promise(resolve => {
setTimeout(() => {
console.log(`Async routine ${label} completed`)
resolve(`Async routine ${label} result`)
}, 100)
})
}
包装器的使用
异步操作及判断逻辑抽取出来
具体使用
import { asyncRoutine } from './asyncRoutine.js'
import { createCancelWrapper } from './cancelWrapper.js'
import { CancelError } from './cancelError.js'
// 每次异步调用的时候都将函数进行包装, 会让代码显得有点l
async function cancelable (cancelWrapper) {
const resA = await cancelWrapper(asyncRoutine, 'A')
console.log(resA)
const resB = await cancelWrapper(asyncRoutine, 'B')
console.log(resB)
const resC = await cancelWrapper(asyncRoutine, 'C')
console.log(resC)
}
const { cancelWrapper, cancel } = createCancelWrapper()
cancelable(cancelWrapper)
.catch(err => {
if (err instanceof CancelError) {
console.log('Function canceled')
} else {
console.error(err)
}
})
setTimeout(() => {
cancel()
}, 100)
具体实现
import { CancelError } from './cancelError.js'
export function createCancelWrapper () {
let cancelRequested = false
function cancel () {
cancelRequested = true
}
function cancelWrapper (func, ...args) {
if (cancelRequested) {
return Promise.reject(new CancelError())
}
return func(...args)
}
return { cancelWrapper, cancel }
}
生成器实现叫停函数
使用
import { asyncRoutine } from './asyncRoutine.js'
import { createAsyncCancelable } from './createAsyncCancelable.js'
import { CancelError } from './cancelError.js'
const cancelable = createAsyncCancelable(function * () {
const resA = yield asyncRoutine('A')
console.log(resA)
const resB = yield asyncRoutine('B')
console.log(resB)
const resC = yield asyncRoutine('C')
console.log(resC)
})
const { promise, cancel } = cancelable()
promise.catch(err => {
if (err instanceof CancelError) {
console.log('Function canceled')
} else {
console.error(err)
}
})
setTimeout(() => {
cancel()
}, 100)
实现
import { CancelError } from './cancelError.js'
export function createAsyncCancelable (generatorFunction) {
return function asyncCancelable (...args) {
const generatorObject = generatorFunction(...args)
let cancelRequested = false
function cancel () {
cancelRequested = true
}
const promise = new Promise((resolve, reject) => {
async function nextStep (prevResult) {
if (cancelRequested) {
return reject(new CancelError())
}
if (prevResult.done) {
return resolve(prevResult.value)
}
try {
nextStep(generatorObject.next(await prevResult.value))
} catch (err) {
try {
nextStep(generatorObject.throw(err))
} catch (err2) {
reject(err2)
}
}
}
nextStep({})
})
return { promise, cancel }
}
}