created: 2023-11-24T02:39:02.292Z
JavaScript で CSV データを stream から yield する実装
Lambda/CloudFunction 上で、たくさんの CSV とか大きな CSV ファイルを加工してアップロードするみたいな処理はけっこうある。かなりある。 CSV をまるごとプロセスで読み込むにはメモリが足りなくなるので、nodejs の場合は stream をつかって処理をすることになる。
何年か前は stream 以外のインターフェイスで CSV を読み込める npm パッケージはあんまりなかったような気がするが、 csv-parse
がいつのまにか asyncIterator
に対応していたので、その辺を使いながら処理を挟み込んだりする方法をメモしておく。
import { Readable, type Stream, ReadStream } from "node:stream";
import { parse, type Options as ParseOptions } from "csv-parse";
import { mapKeys, camelCase, mapValues } from "lodash";
export type AsyncIterateCsvOption = Partial<
ParseOptions & {
camelizeKey: boolean;
replaceEmptyValueWithNull: boolean;
}
>;
export function asyncIterateCsv(
content: ReadStream | string | Buffer,
options: AsyncIterateCsvOption = {}
) {
const stream: Stream =
content instanceof Buffer || typeof content === "string"
? Readable.from(content)
: content;
// `camelizeKey`, `replaceEmptyValueWithNull` はかなりよく使う
// csv-parse の cast や on_record は他のオプションとの整合性が面倒そうなので自分でやる
const camelizeKey = delete options.camelizeKey;
const replaceEmptyValueWithNull = delete options.replaceEmptyValueWithNull;
const parser = stream.pipe(parse(options));
return {
async *[Symbol.asyncIterator]() {
for await (let record of parser) {
if (camelizeKey) {
record = mapKeys(record, (_, key) => camelCase(key));
}
if (replaceEmptyValueWithNull) {
record = mapValues(record, (value) => (value === "" ? null : value));
}
yield record;
}
},
};
}