Amazon ECS上のタスクをGolangで操作する

この記事は Speeeアドベントカレンダー 16日目 の記事になります。 昨日は、 kosuke_nishayaさん による、「作ればわかる、FM音源」でした。

この記事では、バッチ環境としてAmazon ECS上でのタスク実行をGolangで管理するためのAPIの使い方を https://github.com/yoppi/ecs-launcher-go をもとに紹介します。

Amazon ECS(Elastic Container Service)とは

EC2上にDockerコンテナクラスタを構築し、コンテナ上で「タスク」という単位で任意の計算の実行環境を提供してくれるサービスです。

ECSは細かい単位でタスクを制御できることから、大量のタスクを同時に実行するのにAWS Batchより適しているアーキテクチャだと考えています。 デメリットとしてはECSクラスタの構築運用が煩雑的なところでしょう。

環境構築などはこの記事では言及しません。ドキュメント及び多くの方の記事を参考にしてください。

GoからECSのタスクを扱う

AWSコンソールから手動でタスクを実行可能ですが、多くの場合、事業やサービスにおいてさまざまな状態を扱う必要があるので、タスク実行をプログラムで制御したくなることでしょう。 ただ、AWS SDKAPIはほぼ自動生成されているため、全般的に読みにくいところが多いですが、基本的に使うAPIさえ俯瞰すれば難しくはありません。 ここではECSを扱う際に必要十分なAPIを紹介します。

ECSはタスク単位でDockerコンテナ上で実行されます。そこで、

  • タスク作成
  • タスク実行
  • タスク監視

これら、3つのAPIをもとに操作することになります。

タスク作成

// launcher.go

type ECSTask struct {
    Input  *ecs.RunTaskInput
    Output *ecs.RunTaskOutput
    doneCh chan struct{}
}

// example/hello.go

func createTasks() []*launcher.ECSTask {
    var tasks []*launcher.ECSTask

    def := "task-name"
    cluster := "cluster-name"
    container := "container-name"

    for i := 0; i < 10; i++ {
        tasks = append(tasks, launcher.NewECSTask(&ecs.RunTaskInput{
            TaskDefinition: aws.String(def),
            Cluster:        aws.String(cluster),
            Count:          aws.Int64(1),
            Overrides: &ecs.TaskOverride{
                ContainerOverrides: []*ecs.ContainerOverride{
                    {
                        Name: aws.String(container),
                        Environment: []*ecs.KeyValuePair{
                            {
                                Name:  aws.String("i"),
                                Value: aws.String(fmt.Sprint(i)),
                            },
                        },
                    },
                },
            },
        }))
    }

    return tasks
}

ECSのタスクの実行は、 RunTaskInput を入力として、 RunTaskOutput を生成します。

RunTaskInputは実行するクラスタ、タスク定義、そして環境変数を登録します。 タスクはECSはDockerコンテナ上で実行されるので、実行時に可変となる値をタスク内のプログラムで使う場合は、必ず環境変数を渡すことになります。 また、タスク実行は1度に一つとしています。そのためcountを1にしています。

RunTaskOutputは、ECSのDockerコンテナ上でタスクが登録されたときに返されます。 タスクのステータスや、タスクを一意に決定するARNもこの構造体に登録されています。

ECSLauncherはgoroutineで並行してタスクを実行するので、doneChを通じてそのタスクが完了したか(エラー含めて)を検知します。

タスク実行

func (t *ECSTask) Run(client *ecs.ECS) error {
    for {
        resp, err := client.RunTask(t.Input)
        if err != nil {
            if strings.Contains(err.Error(), "No Container Instances") {
                fmt.Printf("wait for launching instances:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            } else if strings.Contains(err.Error(), "ThrottlingException") {
                fmt.Printf("wait for becoming empty throttles:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            } else {
                return err
            }
        }

        if len(resp.Failures) > 0 {
            var isRetry bool

            for _, failure := range resp.Failures {
                reason := aws.StringValue(failure.Reason)
                if strings.HasPrefix(reason, "RESOURCE:") || reason == "AGENT" {
                    isRetry = true
                }
            }

            if isRetry {
                fmt.Printf("wait for releasing machine resource:%v\n", t.StringEnvs())
                time.Sleep(30 * time.Second)
                continue
            }
        }

        if len(resp.Tasks) > 0 {
            t.Output = resp
            fmt.Printf("task started:%s\n", t.StringEnvs())
            break
        }

        // errがnilでFailures, Tasksが両方共空の場合もある
        fmt.Printf("fail RunTask():%s\n", t.StringEnvs())
        time.Sleep(30 * time.Second)
    }

    return nil
}

RunTask() がタスクを実行するAPIになります。 戻り値は、先程紹介したRunTaskOutput及びerrorになります。 このAPIを呼び出し後にerrorが返却されると、

の2点を、errorのメッセージから判別してリトライする必要があります。 このあたりがかなり煩雑で、せめてerrorの型を定義してほしいですね。

ECSクラスタインスタンスを起動するのにはある程度時間を要します。

バッチ目的で クラスタ内のインスタンス起動 → タスク実行

というJobFlowを組んでいたとするとこのタスク実行時には確実に間に合わずリトライする必要があります。

errorが返されず Failures が存在する場合もRunTask()に失敗しています。 クラスタ内のインスタンスにリソースが足りていない場合(いくつかタスクが実行されている状態)はリトライするようにします。

errorもなく、Failuresも0の場合で、Tasks が1つ以上ある場合は、RunTaskが成功したとみなせます。 しかし、errorもなくFailuresも0、Tasksも0といった状態もあり、これも考慮する必要があります。

タスク監視

func (t *ECSTask) Describe(client *ecs.ECS) {
    for {
        input := &ecs.DescribeTasksInput{
            Cluster: t.Input.Cluster,
            Tasks:   []*string{t.GetArn()},
        }

        resp, err := client.DescribeTasks(input)
        if err != nil {
            fmt.Printf("%v\n", err)
            time.Sleep(10 * time.Second)
            continue
        }

        if len(resp.Failures) > 0 {
            for _, failure := range resp.Failures {
                fmt.Printf("%v\n", failure)
            }
            time.Sleep(10 * time.Second)
            continue
        }

        if len(resp.Tasks) > 0 {
            task := resp.Tasks[0]
            if aws.StringValue(task.LastStatus) == "STOPPED" {
                var failed bool

                if len(task.Containers) > 0 {
                    for _, c := range task.Containers {
                        if c.ExitCode != nil && *c.ExitCode > 0 {
                            failed = true
                            fmt.Printf("fail task exitCode:%v reason:%v containerArn:%v taskArn:%v\n", *c.ExitCode, *c.Reason, *c.ContainerArn, *c.TaskArn)
                        }
                    }
                }

                if !failed {
                    fmt.Printf("finish task arn:%v elappsed:%v envs:%v\n", aws.StringValue(t.GetArn()), task.StartedAt, task.StoppedAt, t.StringEnvs())
                }

                t.doneCh <- struct{}{}

                break
            } else {
                fmt.Printf("task status:%v arn:%v envs:%v\n", aws.StringValue(task.LastStatus), aws.StringValue(t.GetArn()), t.StringEnvs())
            }
        }

        time.Sleep(10 * time.Second)
    }
}

実行中のタスクがどのような状況かを判別するためのAPIが、DescribeTasks()です。 引数としてDescribeTasksInput構造体を受取り、対象のタスクのARNとクラスタ名を指定して実行します。

重要なのは、レスポンスのTasksがある場合のハンドリングです。 ここからタスクのステータスを判別することができ、実行中もしくは停止している、かを判断します。 ステータスの値が STOPPED の場合タスクが停止しているとみなせます。 ただ、ただしく終了したかどうかは判別できないので、タスクの終了コードも見て判別します。 0より大きい場合、何らかの原因で終了したことがわかります。

まとめ

この記事ではAamazon ECS上でのタスクをGolangで扱うときのtips及をお届けしました。 明日は、bino98さんによる「Botのお話」です!楽しみですね!