原生不支持, 可通过消费者端批量累积, 或使用插件
消费者端批量累积
class BatchConsumer {
constructor(batchSize = 10, timeout = 1000) {
this.batch = [];
this.batchSize = batchSize;
this.timeout = timeout;
this.timer = null;
}
async consume(channel, queue) {
await channel.prefetch(batchSize * 2); // 预取足够消息
await channel.consume(queue, async (msg) => {
this.batch.push({
content: msg.content,
deliveryTag: msg.fields.deliveryTag
});
// 达到批量大小
if (this.batch.length >= this.batchSize) {
await this.processBatch(channel);
}
// 超时触发
if (!this.timer && this.batch.length > 0) {
this.timer = setTimeout(() => {
this.processBatch(channel);
}, this.timeout);
}
});
}
async processBatch(channel) {
}
}