SQSから10000レコードのバッチをLambda + Goで高速で処理する方法

はじめに

こんにちはVoicyで技術責任者としてエンジニアリングマネージャとか色々やってますやまげん@yamageniiです。 最近Twitter頑張ってるので、よかったらフォローしてください。

どれだけマネジメントが忙しくなっても、プレイヤーとして現場に入りたいと考えていて ボトルネックにならない範囲やお助けタスクなどでまだまだ実装をしています。 今回は自分がVoicyで実装した時のTipsを紹介します。

早速本題に入るのですが、今回は「SQSから10000レコードのバッチをLambda + Goで高速で処理する方法」を紹介します。

SQSはスケーラブルなメッセージングキューで、大量のアクセスを取りこぼさず処理をすることができます。 SQSではメッセージ処理にLambdaを利用することができるので、サーバレスに大量のメッセージを処理するのは簡単になったと思います

ただし無限にスケールするメッセージをLambdaが無限に起動して処理をしてくれるなどというものではなくて、リソース制約があります。

例えばSQSでFIFOキューを利用しない場合はLambdaでは最大10000件のレコードを処理することができます。 AWS公式 Amazon SQS での Lambda の使用

その他にもLambdaは自動起動数などの制限もあります。コスト節約のためにLambda実行数は抑えたいと考える人は多いかもしれません。 そうなるとLambda1つあたりに処理するレコードはなるべく増やしたいはずで、最大数の10000件に近いレコードを処理したいはずです。 それらを直列で処理をしたいということはあまりなく、並列で実行したいユースケースがほとんどでしょう。

今回はそういったユースケースをGo言語で実現する方法を紹介します。

実行環境

Lambda Go 1.x

まずやること

公式ドキュメントを読みましょう。課題に詰まってから公式ドキュメントに戻ると大体書いてあるというのはあるあるです(反省してる顔) これを見るだけでも大量のレコードをSQSLambdaで処理する時のスループットに関わる様々な抑えるべきポイントがあります。

バッチサイズが 10 を超える場合は、MaximumBatchingWindowInSeconds パラメータも 1 秒以上に設定します。

Batch window (バッチウィンドウ) - 関数を呼び出す前にレコードを収集する最大時間 (秒数)。これが適用されるのは標準キューのみです。0 秒を超えるバッチウィンドウを使用している場合は、キューの可視性タイムアウトの処理時間の増加を考慮する必要があります。キューの可視性タイムアウトは、関数タイムアウトの 6 倍に MaximumBatchingWindowInSeconds の値を加えた時間に設定することをお勧めします。

Lambda は、一度に最大 5 つのバッチを処理します。これは、一度にメッセージのバッチ処理と処理を同時に実行できるワーカーが最大 5 つあることを意味します。メッセージがまだ利用可能な場合、Lambda はバッチを読み込むプロセスの数を 1 分あたり最大 60 インスタンスまで増やします。イベントソースマッピングによって同時に処理できるバッチの最大数は 1,000 です。詳細については、スケーリングと処理 を参照してください。

失敗したバッチ内のすべてのメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを行うには、イベントソースマッピングを設定するときに、値 ReportBatchItemFailures を FunctionResponseTypes リストに含めます。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

などなど

もし用語がわからない場合はBlackBeltで概要を掴んで公式ドキュメントを読んだり、ソリューションアーキテクトや専門家の方に相談しましょう。

ざっくりわかったら動かしてみて、自分の理解と相違がないかをFBしていくと理解が深まると思います

ここからは実際にLambdaとGo言語での実装を紹介しながらポイントを解説します。

Goroutineで無闇に10000レコード並列化しない

Lambdaにはランタイムのクオータ制限があり、ファイルディスクリプターは1024までしか利用できません。 無闇にネットワークアクセスを並列化するとファイルディスクリプターを消費しきってしまってエラーになってしまいます。 またLambdaのランタイムのリソースによっても効率的に処理できる同時並列数は変わってきます。

これらのことを踏まえた上で、「Goroutineの並列数」と「Lambdaのメモリ」を掛け算しながら一番高速で処理をできるものを探していきましょう。 これは実際に実行して試して探してみるしかないと思います。

最後にここではGoを並列数を指定して処理をする実装を紹介します。

バッチで失敗したアイテムのみSQSにレスポンスする

公式ドキュメントに書かれているように、LambdaでErrorで返すとバッチのレコード全てが失敗扱いになります。つまりもしレコードの一つで失敗していた場合でも全てが失敗扱いになります。

SQSLambdaではその問題を避けるために失敗したレコードをエラーレコードとしてレスポンスとして返すことができます。この実装方法も含めて紹介します。

実装紹介

package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
)

// GoroutineConcurrency goroutineの並列数
const GoroutineConcurrency = 100

// SqsBatchResponse レスポンス
type SqsBatchResponse struct {
    BatchItemFailures []BatchItemFailure `json:"batchItemFailures"`
}

type BatchItemFailure struct {
    ItemIdentifier string `json:"itemIdentifier"`
}

// Handler エントリーポイントとなる関数
func Handler(ctx context.Context, sqsEvent events.SQSEvent) (SqsBatchResponse, error) {
    // Goroutineの並列をチャンネルで制限する
    sem := make(chan struct{}, GoroutineConcurrency)
    errMessageIdCh := make(chan string, len(sqsEvent.Records))
    var wg sync.WaitGroup

    for _, record := range sqsEvent.Records {
        sem <- struct{}{}
        wg.Add(1)
        go func(record events.SQSMessage) {
            defer wg.Done()
            defer func() { <-sem }()

            // 処理をする


            if err != nil {
                errMessageIdCh <- record.MessageId
                return
            }
        }(record)
    }
    go func() {
        defer close(sem)
        defer close(errMessageIdCh)
        wg.Wait()
    }()

    // 失敗したMessageをSQSに伝える
    sqsBatchResponse := SqsBatchResponse{
        BatchItemFailures: make([]BatchItemFailure, 0),
    }
    for messageId := range errMessageIdCh {
        sqsBatchResponse.BatchItemFailures = append(sqsBatchResponse.BatchItemFailures, BatchItemFailure{messageId})
    }

    return sqsBatchResponse, nil
}

func main() {
    lambda.Start(Handler)
}

並列の制限はチャネルのバッファサイズがいっぱいになると、送信をブロックする性質を利用して、セマフォとして扱って並列起動数=バッファサイズとして扱えるチャネルを利用しています。具体的な実装は以下です。

 sem := make(chan struct{}, GoroutineConcurrency)
    errMessageIdCh := make(chan string, len(sqsEvent.Records))
    var wg sync.WaitGroup

    for _, record := range sqsEvent.Records {
        sem <- struct{}{} // ここでブロックされる
        wg.Add(1)
        go func(record events.SQSMessage) {
            defer wg.Done()
            defer func() { <-sem }()

            // 処理をする


            if err != nil {
                errMessageIdCh <- record.MessageId
                return
            }
        }(record)
    }
    go func() {
        defer close(sem)
        defer close(errMessageIdCh)
        wg.Wait()
    }()

エラーの実装はgoroutineの処理でエラーになった場合エラーのチャネルに貯めておいて、以下の形でレスポンスを返します。 jsonになるのがポイントです

 // 失敗したMessageをSQSに伝える
    sqsBatchResponse := SqsBatchResponse{
        BatchItemFailures: make([]BatchItemFailure, 0),
    }
    for messageId := range errMessageIdCh {
        sqsBatchResponse.BatchItemFailures = append(sqsBatchResponse.BatchItemFailures, BatchItemFailure{messageId})
    }

終わりに

AWSの公式ドキュメントをベースにしながら、Go言語の実装をみてきました Go言語は並列にするのは簡単ですが、Lambdaのランタイム上で利用するとその分制約があり、つまりポイントがあります。 パブリッククラウドでリソースの制限はどうしても避けられないですが、しっかり内部を理解して実装をすることで、開発の効率を早めることができます。