FumadocsZDecode
JavaScript

Web Worker 线程管理

一、开启线程

// 主线程
const worker = new Worker('worker.js')

worker.postMessage({ task: 'calculate', data: [1, 2, 3] })

worker.onmessage = (e) => {
  console.log('结果:', e.data)
}
// worker.js(独立文件,运行在新线程)
self.onmessage = (e) => {
  const { data } = e.data
  const result = data.reduce((a, b) => a + b, 0)
  self.postMessage(result)
}

Inline Worker(不需要单独文件)

const blob = new Blob(
  [
    `
  self.onmessage = (e) => {
    self.postMessage(e.data * 2)
  }
`,
  ],
  { type: 'application/javascript' },
)

const worker = new Worker(URL.createObjectURL(blob))

二、销毁线程

// 主线程强杀(立即终止,不管 worker 在干嘛)
worker.terminate()

// worker 内部自己关闭(更优雅)
self.close()

三、判断核心数

const cores = navigator.hardwareConcurrency // 如 8

// 留 1-2 个核给主线程和系统
const workerCount = Math.max(1, navigator.hardwareConcurrency - 2)

四、线程池

实际项目不应无限创建 Worker,用线程池控制数量。

class WorkerPool {
  constructor(workerScript, poolSize) {
    this.poolSize = poolSize || Math.max(1, navigator.hardwareConcurrency - 2)
    this.workers = []
    this.queue = []
    this.activeCount = 0

    for (let i = 0; i < this.poolSize; i++) {
      this.workers.push(new Worker(workerScript))
    }
  }

  run(data) {
    return new Promise((resolve, reject) => {
      const task = { data, resolve, reject }
      if (this.workers.length > 0) {
        this._dispatch(task)
      } else {
        this.queue.push(task) // 没有空闲 worker,排队
      }
    })
  }

  _dispatch(task) {
    const worker = this.workers.pop()
    this.activeCount++

    worker.onmessage = (e) => {
      task.resolve(e.data)
      this.activeCount--
      this.workers.push(worker) // 归还到池子

      if (this.queue.length > 0) {
        this._dispatch(this.queue.shift())
      }
    }

    worker.onerror = (e) => {
      task.reject(e)
      this.activeCount--
      this.workers.push(worker)
    }

    worker.postMessage(task.data)
  }

  destroy() {
    this.workers.forEach((w) => w.terminate())
    this.workers = []
    this.queue = []
  }
}

使用

const pool = new WorkerPool('worker.js', 4)

const results = await Promise.all([
  pool.run({ task: 'job1' }),
  pool.run({ task: 'job2' }),
  pool.run({ task: 'job3' }),
  pool.run({ task: 'job4' }),
  pool.run({ task: 'job5' }), // 前 4 个占满,排队等待
])

pool.destroy()

五、控制任务顺序

Worker 默认不保证顺序,有三种控制方式:

串行(一个完成再下一个)

async function runInOrder(pool, tasks) {
  const results = []
  for (const task of tasks) {
    results.push(await pool.run(task))
  }
  return results
}

并发执行,按原始顺序返回

// Promise.all 天然保证结果顺序与入参一致
const results = await Promise.all(tasks.map((task) => pool.run(task)))

带 ID 回传(手动对应)

// 主线程
worker.postMessage({ id: 3, data: 'xxx' })

// worker.js
self.onmessage = (e) => {
  const result = doWork(e.data.data)
  self.postMessage({ id: e.data.id, result })
}

六、注意事项

// 1. Worker 不能访问 DOM
document.getElementById('app') // ❌

// 2. postMessage 传递的是结构化克隆(深拷贝),大数据有性能开销
worker.postMessage(bigArray)

// 3. 大数据用 Transferable 转移所有权(零拷贝)
const buffer = new ArrayBuffer(1024 * 1024)
worker.postMessage(buffer, [buffer]) // 第二个参数是 transfer list
console.log(buffer.byteLength) // 0,所有权已转移给 worker

// 4. 共享内存:多个 worker 访问同一块内存
const shared = new SharedArrayBuffer(1024)
worker1.postMessage(shared)
worker2.postMessage(shared)
// 需配合 Atomics 避免竞态条件

七、总结

操作方法
创建线程new Worker('file.js')
通信postMessage / onmessage
销毁worker.terminate()self.close()
判断核心数navigator.hardwareConcurrency
线程池手动实现,预创建 + 任务队列
控制顺序Promise.all(并发有序)/ for await(串行)
大数据优化Transferable 零拷贝 / SharedArrayBuffer 共享内存

On this page