0
点赞
收藏
分享

微信扫一扫

Nodejs 中异步请求相关操作: 存入任务队列, 批处理相同请求, 取消异步操作

践行数据分析 2022-04-13 阅读 28
node.js

pre-initialization queue 预初始化队列

处理数据库初始化之前的请求

0.5 秒后, 数据库连接成功, 在这之前, 将请求的任务放入任务队列中, 等到数据库连接成功, 执行任务队列中的所有任务

import { EventEmitter } from 'events'

class DB extends EventEmitter {
  connected = false
  commandsQueue = []

  async query(queryString) {
    // 数据库查询的时候, 发现未连接数据库, 则将请求封装进一个函数, 并 push 进 queue 中
    if (!this.connected) {
      console.log(`Request queued: ${queryString}`)
      // commmand 命令模式: 通过闭包方式存储之后需要运行的信息
      const command = () => {
        this.query(queryString)
      }
      this.commandsQueue.push(command)
      return
    }
    console.log(`Query executed: ${queryString}`)
  }

  connect() {
    // 模拟建立网络连接时的延迟
    setTimeout(() => {
      this.connected = true
      // 连接成功后, 处理全部之前未处理的请求
      this.commandsQueue.forEach(command => command())
      this.commandsQueue = []
    }, 500)
  }
}

export const db = new DB()

import { db } from './db.js'

db.connect()

async function updateLastAccess() {
  await db.query(`INSERT (${Date.now()}) INTO "LastAccesses"`)
}
updateLastAccess()
process.nextTick(() => {
  console.log('-----------------------')
})
setTimeout(() => {
  updateLastAccess()
}, 600)

State 状态模式

将数据库中的 query 代码实现抽取出来, 以提升其模块程度

import { EventEmitter } from 'events'

const METHODS_REQUIRING_CONNECTION = ['query']
const deactivate = Symbol('deactivate')

class InitializedState {
  async query(queryString) {
    console.log(`Query executed: ${queryString}`)
  }
}

class QueuingState {
  constructor(db) {
    this.db = db
    this.commandsQueue = []

    // 当进行某项操作, 如 query 时
    METHODS_REQUIRING_CONNECTION.forEach(methodName => {
      this[methodName] = function (...args) {
        console.log('Command queued:', methodName, args)
        // 将操作及参数通过闭包的方式存入一个函数
        const command = () => {
          db[methodName](...args)
        }
        this.commandsQueue.push(command)
        return
      }
    })
  }

  [deactivate]() {
    this.commandsQueue.forEach(command => command())
    this.commandsQueue = []
  }
}

class DB extends EventEmitter {
  constructor() {
    super()
    // 初始状态: 未初始化完成的状态
    this.state = new QueuingState(this)
  }

  // 不同状态, 不同的查询方式
  async query(queryString) {
    return this.state.query(queryString)
  }

  connect() {
    setTimeout(() => {
      this.connected = true
      const oldState = this.state
      // 连接成功时手动修改状态
      this.state = new InitializedState(this)
      oldState[deactivate] && oldState[deactivate]()
    }, 500)
  }
}

export const db = new DB()

批处理相同异步请求

实现方式: 将请求/结果存入一个 Map 中, 如果遇到相同的请求, 并且该请求还未完成, 就直接返回相同的请求/结果

处理请求的服务器

import { createServer } from 'http'
// import { totalSales } from './totalSales.js'
import { totalSales } from './totalSalesBatch.js'
// import { totalSales } from './totalSalesCache.js'

createServer(async (req, res) => {
  const url = new URL(req.url, 'http://localhost')
  const product = url.searchParams.get('product')
  console.log(`Processing query: ${url.search}`)

  const sum = await totalSales(product)

  res.setHeader('Content-Type', 'application/json')
  res.writeHead(200)
  res.end(JSON.stringify({
    product,
    sum
  }))
}).listen(8000, () => console.log('Server started'))

未批量处理请求

import level from 'level'
import sublevel from 'subleveldown'

const db = level('example-db')
const salesDb = sublevel(db, 'sales', { valueEncoding: 'json' })

export async function totalSales (product) {
  const now = Date.now()
  let sum = 0
  for await (const transaction of salesDb.createValueStream()) {
    if (!product || transaction.product === product) {
      sum += transaction.amount
    }
  }

  console.log(`totalSales() took: ${Date.now() - now}ms`)

  return sum
}

十万数据模拟

import level from 'level'
import sublevel from 'subleveldown'
import nanoid from 'nanoid'

const db = level('example-db')
const salesDb = sublevel(db, 'sales', { valueEncoding: 'json' })
const products = ['book', 'game', 'app', 'song', 'movie']

async function populate () {
  for (let i = 0; i < 100000; i++) {
    await salesDb.put(nanoid(), {
      amount: Math.ceil(Math.random() * 100),
      product: products[Math.floor(Math.random() * 5)]
    })
  }

  console.log('DB populated')
}

populate()

二十次请求模拟

import superagent from 'superagent'

const start = Date.now()
let count = 20
let pending = count
const interval = 200
const query = process.argv[2] ? process.argv[2] : 'product=book'

function sendRequest () {
  superagent.get(`http://localhost:8000?${query}`)
    .then(result => {
      console.log(result.status, result.body)
      if (!--pending) {
        console.log(`All completed in: ${Date.now() - start}ms`)
      }
    })

  if (--count) {
    setTimeout(sendRequest, interval)
  }
}

sendRequest()

缓存请求

将请求结果缓存, 如果遇到相同的请求, 并且该请求还未完成, 就直接返回相同的请求

import { totalSales as totalSalesRaw } from './totalSales.js'

const runningRequests = new Map()

export function totalSales (product) {
  if (runningRequests.has(product)) {
    console.log('Batching')
    return runningRequests.get(product)
  }

  const resultPromise = totalSalesRaw(product)
  runningRequests.set(product, resultPromise)
  // 请求处理完毕, 将其从映射表中删除
  resultPromise.finally(() => {
    runningRequests.delete(product)
  })

  return resultPromise
}

缓存结果

请求执行完毕, 也不急着把相关的 Promise 从映射表中删除

import { totalSales as totalSalesRaw } from './totalSales.js'

const CACHE_TTL = 30 * 1000 // 30 seconds TTL
const cache = new Map()

export function totalSales (product) {
  if (cache.has(product)) {
    console.log('Cache hit')
    return cache.get(product)
  }

  const resultPromise = totalSalesRaw(product)
  cache.set(product, resultPromise)
  resultPromise.then(() => {
    setTimeout(() => {
      cache.delete(product)
    }, CACHE_TTL)
  }, err => {
    cache.delete(product)
    throw err
  })

  return resultPromise
}

异步取消

出错取消

手动制造错误 => 执行 catch

import { asyncRoutine } from './asyncRoutine.js'
import { CancelError } from './cancelError.js'

async function cancelable (cancelObj) {
  const resA = await asyncRoutine('A')
  console.log(resA)
  if (cancelObj.cancelRequested) {
    throw new CancelError()
  }

  const resB = await asyncRoutine('B')
  console.log(resB)
  if (cancelObj.cancelRequested) {
    throw new CancelError()
  }

  const resC = await asyncRoutine('C')
  console.log(resC)
}

const cancelObj = { cancelRequested: false }
cancelable(cancelObj)
  .catch(err => {
    if (err instanceof CancelError) {
      console.log('Function canceled')
    } else {
      console.error(err)
    }
  })

setTimeout(() => {
  cancelObj.cancelRequested = true
}, 100)

缺点: 执行完成后, 还要判断是否需要退出, 存在代码冗余

export class CancelError extends Error {
  constructor () {
    super('Canceled')
    this.isCanceled = true
  }
}
export function asyncRoutine (label) {
  console.log(`Starting async routine ${label}`)
  return new Promise(resolve => {
    setTimeout(() => {
      console.log(`Async routine ${label} completed`)
      resolve(`Async routine ${label} result`)
    }, 100)
  })
}

包装器的使用

异步操作及判断逻辑抽取出来

具体使用

import { asyncRoutine } from './asyncRoutine.js'
import { createCancelWrapper } from './cancelWrapper.js'
import { CancelError } from './cancelError.js'

// 每次异步调用的时候都将函数进行包装, 会让代码显得有点l
async function cancelable (cancelWrapper) {
  const resA = await cancelWrapper(asyncRoutine, 'A')
  console.log(resA)
  const resB = await cancelWrapper(asyncRoutine, 'B')
  console.log(resB)
  const resC = await cancelWrapper(asyncRoutine, 'C')
  console.log(resC)
}

const { cancelWrapper, cancel } = createCancelWrapper()

cancelable(cancelWrapper)
  .catch(err => {
    if (err instanceof CancelError) {
      console.log('Function canceled')
    } else {
      console.error(err)
    }
  })

setTimeout(() => {
  cancel()
}, 100)

具体实现

import { CancelError } from './cancelError.js'

export function createCancelWrapper () {
  let cancelRequested = false

  function cancel () {
    cancelRequested = true
  }

  function cancelWrapper (func, ...args) {
    if (cancelRequested) {
      return Promise.reject(new CancelError())
    }
    return func(...args)
  }

  return { cancelWrapper, cancel }
}

生成器实现叫停函数

使用

import { asyncRoutine } from './asyncRoutine.js'
import { createAsyncCancelable } from './createAsyncCancelable.js'
import { CancelError } from './cancelError.js'

const cancelable = createAsyncCancelable(function * () {
  const resA = yield asyncRoutine('A')
  console.log(resA)
  const resB = yield asyncRoutine('B')
  console.log(resB)
  const resC = yield asyncRoutine('C')
  console.log(resC)
})

const { promise, cancel } = cancelable()
promise.catch(err => {
  if (err instanceof CancelError) {
    console.log('Function canceled')
  } else {
    console.error(err)
  }
})

setTimeout(() => {
  cancel()
}, 100)

实现

import { CancelError } from './cancelError.js'

export function createAsyncCancelable (generatorFunction) {
  return function asyncCancelable (...args) {
    const generatorObject = generatorFunction(...args)
    let cancelRequested = false

    function cancel () {
      cancelRequested = true
    }

    const promise = new Promise((resolve, reject) => {
      async function nextStep (prevResult) {
        if (cancelRequested) {
          return reject(new CancelError())
        }

        if (prevResult.done) {
          return resolve(prevResult.value)
        }

        try {
          nextStep(generatorObject.next(await prevResult.value))
        } catch (err) {
          try {
            nextStep(generatorObject.throw(err))
          } catch (err2) {
            reject(err2)
          }
        }
      }

      nextStep({})
    })

    return { promise, cancel }
  }
}

举报

相关推荐

0 条评论