Amazon ECS上のタスクをGolangで操作する

この記事は Speeeアドベントカレンダー 16日目 の記事になります。 昨日は、 kosuke_nishayaさん による、「作ればわかる、FM音源」でした。

この記事では、バッチ環境としてAmazon ECS上でのタスク実行をGolangで管理するためのAPIの使い方を https://github.com/yoppi/ecs-launcher-go をもとに紹介します。

Amazon ECS(Elastic Container Service)とは

EC2上にDockerコンテナクラスタを構築し、コンテナ上で「タスク」という単位で任意の計算の実行環境を提供してくれるサービスです。

ECSは細かい単位でタスクを制御できることから、大量のタスクを同時に実行するのにAWS Batchより適しているアーキテクチャだと考えています。 デメリットとしてはECSクラスタの構築運用が煩雑的なところでしょう。

環境構築などはこの記事では言及しません。ドキュメント及び多くの方の記事を参考にしてください。

GoからECSのタスクを扱う

AWSコンソールから手動でタスクを実行可能ですが、多くの場合、事業やサービスにおいてさまざまな状態を扱う必要があるので、タスク実行をプログラムで制御したくなることでしょう。 ただ、AWS SDKAPIはほぼ自動生成されているため、全般的に読みにくいところが多いですが、基本的に使うAPIさえ俯瞰すれば難しくはありません。 ここではECSを扱う際に必要十分なAPIを紹介します。

ECSはタスク単位でDockerコンテナ上で実行されます。そこで、

  • タスク作成
  • タスク実行
  • タスク監視

これら、3つのAPIをもとに操作することになります。

タスク作成

// launcher.go

type ECSTask struct {
    Input  *ecs.RunTaskInput
    Output *ecs.RunTaskOutput
    doneCh chan struct{}
}

// example/hello.go

func createTasks() []*launcher.ECSTask {
    var tasks []*launcher.ECSTask

    def := "task-name"
    cluster := "cluster-name"
    container := "container-name"

    for i := 0; i < 10; i++ {
        tasks = append(tasks, launcher.NewECSTask(&ecs.RunTaskInput{
            TaskDefinition: aws.String(def),
            Cluster:        aws.String(cluster),
            Count:          aws.Int64(1),
            Overrides: &ecs.TaskOverride{
                ContainerOverrides: []*ecs.ContainerOverride{
                    {
                        Name: aws.String(container),
                        Environment: []*ecs.KeyValuePair{
                            {
                                Name:  aws.String("i"),
                                Value: aws.String(fmt.Sprint(i)),
                            },
                        },
                    },
                },
            },
        }))
    }

    return tasks
}

ECSのタスクの実行は、 RunTaskInput を入力として、 RunTaskOutput を生成します。

RunTaskInputは実行するクラスタ、タスク定義、そして環境変数を登録します。 タスクはECSはDockerコンテナ上で実行されるので、実行時に可変となる値をタスク内のプログラムで使う場合は、必ず環境変数を渡すことになります。 また、タスク実行は1度に一つとしています。そのためcountを1にしています。

RunTaskOutputは、ECSのDockerコンテナ上でタスクが登録されたときに返されます。 タスクのステータスや、タスクを一意に決定するARNもこの構造体に登録されています。

ECSLauncherはgoroutineで並行してタスクを実行するので、doneChを通じてそのタスクが完了したか(エラー含めて)を検知します。

タスク実行

func (t *ECSTask) Run(client *ecs.ECS) error {
    for {
        resp, err := client.RunTask(t.Input)
        if err != nil {
            if strings.Contains(err.Error(), "No Container Instances") {
                fmt.Printf("wait for launching instances:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            } else if strings.Contains(err.Error(), "ThrottlingException") {
                fmt.Printf("wait for becoming empty throttles:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            } else {
                return err
            }
        }

        if len(resp.Failures) > 0 {
            var isRetry bool

            for _, failure := range resp.Failures {
                reason := aws.StringValue(failure.Reason)
                if strings.HasPrefix(reason, "RESOURCE:") || reason == "AGENT" {
                    isRetry = true
                }
            }

            if isRetry {
                fmt.Printf("wait for releasing machine resource:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            }
        }

        if len(resp.Tasks) > 0 {
            t.Output = resp
            fmt.Printf("task started:%s\n", t.StringEnvs())
            break
        }

        // errがnilでFailures, Tasksが両方共空の場合もある
        fmt.Printf("fail RunTask():%s\n", t.StringEnvs())
        time.Sleep(30 * time.Second)
    }

    return nil
}

RunTask() がタスクを実行するAPIになります。 戻り値は、先程紹介したRunTaskOutput及びerrorになります。 このAPIを呼び出し後にerrorが返却されると、

の2点を、errorのメッセージから判別してリトライする必要があります。 このあたりがかなり煩雑で、せめてerrorの型を定義してほしいですね。

ECSクラスタインスタンスを起動するのにはある程度時間を要します。

バッチ目的で クラスタ内のインスタンス起動 → タスク実行

というJobFlowを組んでいたとするとこのタスク実行時には確実に間に合わずリトライする必要があります。

errorが返されず Failures が存在する場合もRunTask()に失敗しています。 クラスタ内のインスタンスにリソースが足りていない場合(いくつかタスクが実行されている状態)はリトライするようにします。

errorもなく、Failuresも0の場合で、Tasks が1つ以上ある場合は、RunTaskが成功したとみなせます。 しかし、errorもなくFailuresも0、Tasksも0といった状態もあり、これも考慮する必要があります。

タスク監視

func (t *ECSTask) Describe(client *ecs.ECS) {
    for {
        input := &ecs.DescribeTasksInput{
            Cluster: t.Input.Cluster,
            Tasks:   []*string{t.GetArn()},
        }

        resp, err := client.DescribeTasks(input)
        if err != nil {
            fmt.Printf("%v\n", err)
            time.Sleep(10 * time.Second)
            continue
        }

        if len(resp.Failures) > 0 {
            for _, failure := range resp.Failures {
                fmt.Printf("%v\n", failure)
            }
            time.Sleep(10 * time.Second)
            continue
        }

        if len(resp.Tasks) > 0 {
            task := resp.Tasks[0]
            if aws.StringValue(task.LastStatus) == "STOPPED" {
                var failed bool

                if len(task.Containers) > 0 {
                    for _, c := range task.Containers {
                        if c.ExitCode != nil && *c.ExitCode > 0 {
                            failed = true
                            fmt.Printf("fail task exitCode:%v reason:%v containerArn:%v taskArn:%v\n", *c.ExitCode, *c.Reason, *c.ContainerArn, *c.TaskArn)
                        }
                    }
                }

                if !failed {
                    fmt.Printf("finish task arn:%v elappsed:%v envs:%v\n", aws.StringValue(t.GetArn()), task.StartedAt, task.StoppedAt, t.StringEnvs())
                }

                t.doneCh <- struct{}{}

                break
            } else {
                fmt.Printf("task status:%v arn:%v envs:%v\n", aws.StringValue(task.LastStatus), aws.StringValue(t.GetArn()), t.StringEnvs())
            }
        }

        time.Sleep(10 * time.Second)
    }
}

実行中のタスクがどのような状況かを判別するためのAPIが、DescribeTasks()です。 引数としてDescribeTasksInput構造体を受取り、対象のタスクのARNとクラスタ名を指定して実行します。

重要なのは、レスポンスのTasksがある場合のハンドリングです。 ここからタスクのステータスを判別することができ、実行中もしくは停止している、かを判断します。 ステータスの値が STOPPED の場合タスクが停止しているとみなせます。 ただ、ただしく終了したかどうかは判別できないので、タスクの終了コードも見て判別します。 0より大きい場合、何らかの原因で終了したことがわかります。

まとめ

この記事ではAamazon ECS上でのタスクをGolangで扱うときのtips及をお届けしました。 明日は、bino98さんによる「Botのお話」です!楽しみですね!

伸長するsliceの取扱い

Goで何かしら値を計算した結果をsliceに格納するときにcapを事前に設定出来ない場合もある。

// capがわかってない場合
var slice []T
// cap(N)がわかっている場合
slice := make([]T, 0, n)

append時にcapが足りない場合、slice内部の値をコピーして伸長する。 そのため、capを事前に設定できない場合、sliceに格納するstructは実態ではなく参照にしておくことが望ましい。

package slice

import (
    "testing"
)

type T struct {
    A int
}

type T2 struct {
    A int
    B string
}

type T3 struct {
    A int
    B string
    C []string
}

func BenchmarkReal(b *testing.B) {
    var slices []T
    for i := 0; i < b.N; i++ {
        t := T{A: 1}
        slices = append(slices, t)
    }

}

func BenchmarkReal2(b *testing.B) {
    var slices []T2
    for i := 0; i < b.N; i++ {
        t := T2{A: 1, B: "B"}
        slices = append(slices, t)
    }
}

func BenchmarkReal3(b *testing.B) {
    var slices []T3
    for i := 0; i < b.N; i++ {
        t := T3{A: 1, B: "B", C: []string{"C"}}
        slices = append(slices, t)
    }

}

func BenchmarkPointer(b *testing.B) {
    var slices []*T
    for i := 0; i < b.N; i++ {
        t := &T{A: 1}
        slices = append(slices, t)
    }
}

func BenchmarkPointer2(b *testing.B) {
    var slices []*T2
    for i := 0; i < b.N; i++ {
        t := &T2{A: 1, B: "B"}
        slices = append(slices, t)
    }
}

func BenchmarkPointer3(b *testing.B) {
    var slices []*T3
    for i := 0; i < b.N; i++ {
        t := &T3{A: 1, B: "B", C: []string{"C"}}
        slices = append(slices, t)
    }
}
$ go test -bench=. -benchmem                                                                                    [~/src/github.com/yoppi/go]
goos: darwin
goarch: amd64
pkg: github.com/yoppi/go
BenchmarkReal-4         100000000               14.2 ns/op            49 B/op          0 allocs/op
BenchmarkReal2-4         5000000               220 ns/op             132 B/op          0 allocs/op
BenchmarkReal3-4         3000000               551 ns/op             272 B/op          1 allocs/op
BenchmarkPointer-4       5000000               235 ns/op              51 B/op          1 allocs/op
BenchmarkPointer2-4     10000000               220 ns/op              74 B/op          1 allocs/op
BenchmarkPointer3-4      5000000               240 ns/op             107 B/op          2 allocs/op
PASS
ok      github.com/yoppi/go     10.506s

興味深いのは、フィールドがint一つだけの場合参照ではなく実態の方が高速に扱えていること。このあたりもうちょっと調べる。

[WIP]文書間類似度の参考文献

最近、仕事で記事レコメンドアルゴリズムをコンテンツベース(文書間類似度)を用いて実装して、記事CTRを大幅(高いものだと700%くらい)に上げられた。 そのとき調べたことを雑にまとめておく。

論文

文書間類似度とはなんぞやを過不足なくまとまっていて良い。

OM-basedのためのtext tilingについて。

ISUCON6

毎年恒例のISUCONに参加(オシャレ怪盗スワロウテイル)してきました。今年で4回目の挑戦になります。

ISUCON6予選

いわゆる、はてなキーワードクローン実装です。

加えて、各機能が別アプリ(別プロセス)で動作しており相互間はAPIでやり取りするという構成でした。 厳密な実装をするならキャッシュ戦略があまり効かないアプリケーションです。ただ、今回ベンチマーカーでPOST後適切にリンクを生成していなくても一発failとならず減点だけとなるのでキャッシュで押し切ってもなんとかなったという感じでした。

時間内にやったことは、

  • まずは、マイクロサービス化されている実装(isutar、isuda)を統合する
  • データ永続化部分をMySQLからRedisに置き換え
  • さらに初期データをすべてhtmlfyしたものをRedisに乗せておく

といったところまで実装し、POSTが来たらキーワード毎のflagを見てキャッシュをパージ、というところまで実装したかったのですが時間が及ばず終了。 なんとか予選を突破できました。

ざわざわしていたGoですが、予選後、試してみてたしかにGoのregexpは遅く、正規表現ではなく単なる置換ですむので strings.Replacer 大躍進というのは良い知見でした。

ISUCON6本選

pixivさん作の問題だけあってpixiv sketchのミニマム実装で、使われている技術要素としてSSE、React、サーバサイドレンダリング、そしてDockerというモダンな構成でした。

最初難儀したのは、各ミドルウェア、アプリケーションがDocker Composeで動作しているので各アプリのログ出力がまったくない!という状態で、スタートしたので、ベンチマーカーの挙動を素早く追えなかったことでした。

Dockerだと、短時間に何回も再起動を繰り返すことになりかえって足かせになるので、

  • MySQLをDockerからhost側に移行
  • nginxをnodeの前に建てる(http2化)

ということを進めました。

ここで、すでに15:00くらいと大きく時間を使ってしまい、慌てることに。 加えてnginx化した段階で初期スコアより大きく下がりさらにさらにあわてることに。 nginx化したことにより、クライアントからのリクエスト要求を受け付けられるようになり、その結果、/img/:id で詰まり(大量のTIME_WAIT)、ベンチマーカーがワークロードを下げたことが原因でした。

ここをまず突破しないと先に進めないぞということで、

  • nginxのproxy_cache
  • nodeにおける /img:id をやめる

ということを戦略としました。 proxy_cacheは 画像の変更 = 画数の変更 ということから、リクエストパラメータにstroke_countを持たせる(/img/:id?s={stroke_count})戦略が良さそうとなりました。 そして、nodeでどうやらつまるのでRuby(thin)でリクエストを捌かせようようという試みでした。

この2つの戦略を進めようとしていましたが、完成させることができずタイムアップとなりました。 懇親会で他チームの話をきいているともうちょっとうまくやれた感じがして悔やまれる…

ちなみに、上記とは別にいろいろやっていたのですが、どれもまずはトップページのからの /img/:id を解決しないことには意味のないものばかりでした。

  • N+1を解決する
  • 特定のキャッシュのパージを実現するためにproxy_cacheではなく、nginxからmemcachedを参照する方法。結果としておそすぎてだめ(1リクエストに100msくらいかかっている)

予選と本選通じて

毎年ハードルが上がっているのを感じていますが、ISUCON6でまた一段難しさが増したという感じでした。 運営、問題作成者みなさん、ありがとうございました。

やはり3人というチーム構成はいいバランスになって良いなと思います(フォローしあいつつも効率的に動ける)。チームメンバに感謝を。

また来年もリベンジです。

fontdがCPUリソースを喰う

ここ1、2年(遅くなったり普通だったりしたりして原因がわからず放置していた...)、Macのターミナル(iTerm)で作業すると、 fontd プロセスがとても重くなりもっさりする現象が続いていた。

原因は、zshのプロンプトに機種依存文字を使っていたからだった。

たとえば、みんな大好きなゆのっちプロンプト。

✘╹◡╹✘

そして、gopherプロンプト。

( ◔ ౪◔)

こういったものを PROMPT に設定していると、ターミナルでのフォントがレンダリングされるタイミングでfontd serverへの問い合わせが発生し、CPUリソースを大幅に消費することでもっさりするのだった。

fontd プロセスがCPUリソースを喰う事例はググるとちらほら出てくるが、根本的な原因がわからず対処できていなかった。

https://github.com/yoppi/config/commit/e9494760fcf67f4e2eb7ba994be012c7c38c542c

平穏が戻ってきた。

goroutineのテストを同期的に行う

とある関数の評価値 ― 例えばファイルに文字列を書き込む ― をテストしたい場合、 その評価がgoroutine内だと、テスト側から実行しても、タイミングによって取得できないことがあります。 そこで、テスト側から評価するときにはchanelを渡す用にして、そのchanelに対して書き込むようにすることで同期処理できるようになります。

type SUTType struct {
  ...
  Out io.WriteCloser
  ...
}

func (o *SUTType) SUT() {
  ...
  go func() {
    o.Out.Write("aqours")
  }()
  ...
}

テスト対象がこんな感じのコードになってたりする。 のでこの、 Out をすげ替えてあげればいい。

func TestSUT(t *testing.T) {
  s := &SUTType{Out: &DummyOut{}}
  
  s.SUT()

  donewait := make(chan struct{}{})
  var result string
  go func() {
    result := <-out.Buf
    donewait <- struct{}{}
  }()

  <-donewait

  if result != "aqours" {
    t.Error("should be aqours")
  }
}

type DummyOut struct {
  Buf chan []byte
}

func (o *DummyOut) Write(p []byte) (int, error) {
  o.Buf <- p
  return 0, nil
}

func (o *DummyOut) Close() error {
  return nil
}

chanelを使うことで同期処理が可能になる、ということと、テスト可能なコードにする場合は、 インタフェースを持つことでテスト側で柔軟なコード ― 上記の例だと、 SUTType のOutの型が os.File だと途端にテストが面倒になる ― にできる典型例でした。

ISUCON5予選

例年どおり、 @f440 とチームを組み、優秀な若者 @corrupt952 を加えて挑戦しました。

準備

などの準備を一ヶ月くらい前から始めていました。

当日

お題が、mixiクローンのSNSとのことで、

  • @yoppiblog、@corrupt952はアプリの調査
  • @f440はインフラ整備、アクセスログやslow queryの解析

と役割を決めて走り始めます。 1時間ほど立って、調査と解析結果から方針を決めます。

  • MySQLが圧倒的に重たいのでここをなんとかしないと先にすすめない
  • 初期データ量が2GB弱と結構ボリュームがあるのでオンメモリ実装は厳しそう

という判断からRubyの参考実装をもとに正攻法で行くことに決めました。

1時間ほど経過...

アプリ全体を見渡すと、足あとはアプリと切り離すことが簡単そう、ロジックもRedisのSorted Setを使うだけで管理できると当たりを付けられたので Redisを使い始めようとしたのですが、indexを適切に効かせたSQLだけでも処理が重たくないと判断したので、Redisをここで投入はしませんでした。

4時間ほど経過...

indexページのSQLに苦戦、 あなたの友だちの〜 あたりをむりやり1SQLで処理させようと頑張ったのが敗因でした。 ここで3、4時間近く費やすことになります。 JOINすると、 commentbody を含めるととてもクエリが重たくなるので、そこは無理せず where inであとから取ってくることで高速に処理できるなぁと

残り1時間...

トップページも改善できずにただ時間だけが過ぎて行き、最終スコアが2000点弱という悲惨な結果でした。

反省点

毎回反省していることで、ひとつのことにこだわり過ぎないというのがありますが、今回もそれでした。 メンバーとあまり連携がとれなかった、というのもあります。 しかし、なんだかんだと今年もISUCONを楽しめました。運営の方々、ありがとうございました。

使ったツールなど