created: 2021-12-29T05:49:59.000Z

nodejsでGCSからStreamしながらunzipしてファイルを取り出してShift-JISのCSVファイルをJSONLine形式にしてまたGCS上に出力する

こんな要件のデータ連携があるとする。

要件

  • 先方がGCPのCloudStorage(S3みたいなやつ)にファイルを置いてくれる
  • 数GBくらいのShift-JISエンコードされたCSVファイル
  • なぜか、ファイルはディレクトリに入った状態で、ディレクトリがzip圧縮されている

受け取ったデータはBigQueryに入れる。

実装方針

サーバ管理をしたくなかったので Cloud Function を使うことにしたいが、ファイル容量が数GBあるため、いったん全部解凍はメモリが足りなくてできない。nodejsのstreamで処理をすることにした。

  • GCS上のファイルからStreamで読み出し
  • zipなので解凍。出てきたディレクトリから目的のファイルの内容を読み出す
  • ファイルがShift-JISエンコードなのでデコード
  • CSVをパース
  • バリデーション処理。不正な行はログを出してスキップ
  • GCS上のファイルへ書き出し(ここもStream)

実装

こんな実装になった。ライブラリを組み合わせるだけで、自前で実装している部分はほとんどない。

import unzip from "unzipper";
import iconv from "iconv-lite";
import { parse as parseCsv } from "fast-csv";

const fn = async (srcGcsFile, destGcsFile, validator: any, tranform: any) => 
  srcGcsFile
    .createReadStream()
    .pipe(unzip.Parse())
    .on("entry", async (entry) => {
      const dest = destGcsFile.createWriteStream();
      entry
        .pipe(iconv.decodeStream("cp932"))
        .pipe(parseCsv({ headers: true }))
        .on("data", (data: any) => 
          validator(data) 
            ? dest.write(JSON.stringify(tranform(data)) + "\n")
            : console.error(`INVALID_ROW:${JSON.stringify(data)}`)
        );
    })
    .promise()

unzipper

zipファイルをdecompressしてくれるライブラリ。

unzip というライブラリがあったのだが、メンテナンスされなくなったのでforkされたものである。

zipをstreamでの読み出すのに対応しているが、そもそもzipファイルは複数ファイルを内包できる仕様になっているので、ファイルごとにon('entry')イベントが呼ばれるインターフェイスになっている。ファイルが複数入っているようなzipファイルの場合はon('entry')が何回も呼ばれる。

本来だと entry オブジェクトに生えているファイル名から目当てのファイルであるかチェックしてから処理をしたり、不要なファイルでは autodrain を呼んで内容を破棄する必要があるようだ。今回は1ファイルしかないはずなのでそのへんのケアは不要だった。

iconv-lite

nodejsのビルトインの機能でstreamされる文字列をdecodeするのが面倒そうだったのでライブラリをつかっている。特別使い方に癖はなく pipe するだけでdecodeされる。

fast-csv

csvをstream処理してくれるという、今回のにぴったりなライブラリ。

headers オプションをつけるとヘッダ行のカラム名からオブジェクトを生成してくれる。1行パースするごとにon('data')イベントが呼ばれるので、そこにやりたい処理を挟めばよい。

その他

GCSの nodejs クライアントはファイルオブジェクトに createWriteStreamcreateReadStream が生えているのでstreamでの読み込みと書き込みはそこからできる。