Photoruction工事中!

Photoructionの開発ブログです!

Amazon Athena 使ってみました

こんにちは、PhotoructionでWebエンジニアをしている田村です。

先日ログ検索機能の実装を担当しAWSのプロダクトである Amazon Athena について調査したのでどのようなサービスか簡単に説明したいと思います!

Amazon Athena とは

標準的なSQLAmazon S3に格納したデータを分析することを簡単に行えるサービスです。

AthenaにS3バケットをデータベースとして定義し、テーブルに対してクエリを実行することができます。

前提

実際は、S3バケット作成・ログファイルをS3に転送・IAMロール定義など準備が必要ですが、今回ここでは詳細に説明しませんので別途調べてみてください。

ちなみにIAMユーザーのポリシーは最低限こんな感じでとりあえずAthenaによる検索はできると思います。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:*"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload"
            ],
            "Resource": [
                "arn:aws:s3:::sample-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:ListAllMyBuckets"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

S3にログファイルを格納

下記のようなディレクトリ構成でログファイルを格納します。

sample-bucket/sample-logs 以下の2022 10 30 31 などは年・月・日を表しています。(詳細は後述)

sample-bucket
└── sample-logs
    └── 2022
        └── 10
            ├── 30
            │   ├── 20221030-1.log
            │   └── 20221030-2.log
            └── 31
                ├── 20221031-1.log
                └── 20221031-2.log

各ログファイルはJSON形式のレコードが1行ずつ蓄積していく形です。

{"log_date":"2022-10-30","item1":"value11","item2":"value21","item3":"value31"}
{"log_date":"2022-10-30","item1":"value12","item2":"value22","item3":"value32"}
{"log_date":"2022-10-30","item1":"value13","item2":"value23","item3":"value33"}

(…以下略)

Athenaでテーブル作成

下記のような定義のテーブルを作成します。

これは”sample_dbs3://sample-bucket/sample-logs/マッピングした samples という名前のテーブルを作成する”というような内容となります。 カラム定義はlog_dateitem3ですがこれはログファイルのレコードとなるJSONの項目と一致します。

CREATE EXTERNAL TABLE sample_db.samples (
    `log_date` STRING,
    `item1` STRING,
    `item2` STRING,
    `item3` STRING
)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://sample-bucket/sample-logs/'
TBLPROPERTIES (
    'projection.enabled' = 'true',
    'projection.year.type' = 'integer',
    'projection.year.range' = '2000,2200',
    'projection.year.digits' = '4',
    'projection.month.type' = 'integer',
    'projection.month.range' = '1,12',
    'projection.month.digits' = '2',
    'projection.day.type' = 'integer',
    'projection.day.range' = '1,31',
    'projection.day.digits' = '2',
    'storage.location.template' = 's3://sample-bucket/sample-logs/${year}/${month}/${day}'
);

PARTITIONED BY TBLPROPERTIES などが気になりますね。 これらはデータのパーティション分割のための定義となります。

Athenaの機能”データのパーティション分割

クエリによってスキャンされるデータの量を制限できるようになるため、パフォーマンスが向上し、コストが削減されます。

実はAthenaはスキャンしたデータ量によって料金がかかってきます。(スキャンされたデータ 1 TB あたり 5.00USD) 上記サンプルログはパッと見た感じでそれほど気になりませんが、Photoructionの操作ログを記録するとなると恐ろしく大量のレコード数になることが予想されます。

そのため、あらかじめ任意のキーでデータをパーティションに分割し、クエリで「この条件でスキャン対象を絞って検索結果を返してね」という記述をしてスキャン量を抑えることができます。

テーブル上の実際のレコード

SELECT * FROM sample_db.samples を実行するとこのような結果が得られます。

テーブル作成時のカラムには指定していなかったyear month day が一緒に返ってきました。

# log_date item1 item2 item3 year month day
1 2022-10-30 00:00:00 value11 value21 value31 2022 10 30
2 2022-10-30 00:00:00 value12 value22 value32 2022 10 30
3 2022-10-30 00:00:00 value13 value23 value33 2022 10 30
4 2022-10-30 00:00:00 value14 value24 value34 2022 10 30
5 2022-10-31 00:00:00 value11 value21 value31 2022 10 31
6 2022-10-31 00:00:00 value12 value22 value32 2022 10 31
7 2022-10-31 00:00:00 value13 value23 value33 2022 10 31
8 2022-10-31 00:00:00 value14 value24 value34 2022 10 31

つまり”PARTITIONED BY” “TBLPROPERTIES” とは

TBLPROPERTIESディレクトリとyear month dayマッピング(と併せて型や範囲も)定義していることになります。 sample-bucket/sample-logs/2022/10/30/xxxxxx.log の場合は、データとしてyear=2022 month=10 day=30 となります。

またPARTITIONED BY によって year month dayパーティション分割するということになります。

というわけでSQLSELECT * FROM sample_db.samples WHERE year = 2022 AND month = 10 day = 30 のようにすると、実際にはsample-bucket/sample-logs/2022/10/30/ 以下のファイルのみスキャン対象とし、それ以外のディレクトリはスキャンしないため、その分スキャン量を抑えられる(=サービスの利用料金を抑えることができる)ということになります。

実装サンプル

それでは実際にAWSSDKPHP版)を使用してAthenaを検索してみます。

前提:composerでaws/aws-sdk-php3.x系 を利用

// ①クエリ
$query = 'SELECT * FROM sample_db.samples WHERE year = 2022 AND month = 10 AND day = 30';

// ②Athenaクライアント
$athenaClient = new Aws\Athena\AthenaClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
    'credentials' => [
        'key' => 'SAMPLE_AWS_ACCESS_KEY_ID',
        'secret' => 'SAMPLE_AWS_SECRET_ACCESS_KEY',
    ],
]);

// ③クエリ実行
$startQueryResponse = $athenaClient->startQueryExecution([
    'QueryString' => $query,
    'ResultConfiguration' => [
        'OutputLocation' => 's3://sample-bucket/sample-results'
    ]
]);

// ④QueryExecutionId取得
$queryExecutionId = $startQueryResponse->get('QueryExecutionId');

for ($times=0; $times < 20; $times++) {
    // ⑤QueryExecutionIdを元に実行ステータスを取得
    $responseExecution = $athenaClient->getQueryExecution([
        'QueryExecutionId' => $queryExecutionId
    ]);
    $status = $responseExecution->get('QueryExecution')['Status']['State'];

    // ⑥ステータスが「QUEUED」(受付済み)、「RUNNING」(実行中)だったら1秒待って繰り返す
    if (in_array($status, ['QUEUED', 'RUNNING'])) {
        sleep(1);
        continue;
    }

    // ⑦ステータスが「SUCCEEDED」(成功)だったら結果を取得してみる
    if ($status === 'SUCCEEDED') {
        // ⑧QueryExecutionIdを元に結果セットを取得
        $responseResults = $athenaClient->getQueryResults([
            'QueryExecutionId' => $queryExecutionId
        ]);
        $resultSet = $responseResults->get('ResultSet');

        // meta情報
        $meta = $resultSet['ResultSetMetadata']['ColumnInfo'];

        // 検索結果
        $rows = $resultSet['Rows'];
    }

    break;
}

AthenaをSDKを使用して検索する場合、非同期処理で行われるので、実行後に状態を監視して完了するのを待ってから結果を得る必要があります。 よって、

  1. SDKクライアントの初期化(②)
  2. クエリ実行し、結果取得用キー取得(③④)
  3. 結果取得用キーを使ってステータスをチェック(⑤)
    1. 「実行中」なら少し待って再度ステータスチェック(⑥)
    2. 「成功」なら結果取得用キーを使用して検索結果を取得する(⑦⑧)

というような流れになります。

また、③のOutputLocationでログファイルを格納しているディレクトリとは別のS3ディレクトリを指定していますが、Athenaはクエリを実行して成功すると自動で結果セットのCSVをこのディレクトリに出力します。 ですので、例えば検索結果をフロントエンドで確認しつつクエリ結果CSVをダウンロードする機能なども容易に実装することが可能です。

最後に

いかがでしょうか?

今回ご紹介した機能はAthenaのごく一部の機能となり、実際には非常に多くの機能があります。 当然ですが実際にはAthena以外の箇所でログの出力方法・出力先、S3に転送するタイミングや手段などしっかり設計する必要があります。

いろいろと便利なサービスやツールを組み合わせて実装する機会はこれからも多々あるかと思います。 それらをうまく利用して作業時間の短縮やコストの削減を図り、本来時間をかけたい部分に注力してより良いプロダクトを作っていきたいと思います💪