こんにちは、PhotoructionでWebエンジニアをしている田村です。
先日ログ検索機能の実装を担当しAWSのプロダクトである Amazon Athena について調査したのでどのようなサービスか簡単に説明したいと思います!
標準的なSQLでAmazon S3に格納したデータを分析することを簡単に行えるサービスです。
AthenaにS3バケットをデータベースとして定義し、テーブルに対してクエリを実行することができます。
前提
実際は、S3バケット作成・ログファイルをS3に転送・IAMロール定義など準備が必要ですが、今回ここでは詳細に説明しませんので別途調べてみてください。
ちなみにIAMユーザーのポリシーは最低限こんな感じでとりあえずAthenaによる検索はできると思います。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetTables",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload"
],
"Resource": [
"arn:aws:s3:::sample-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess"
],
"Resource": [
"*"
]
}
]
}
S3にログファイルを格納
下記のようなディレクトリ構成でログファイルを格納します。
sample-bucket/sample-logs
以下の2022
10
30
31
などは年・月・日を表しています。(詳細は後述)
sample-bucket
└── sample-logs
└── 2022
└── 10
├── 30
│ ├── 20221030-1.log
│ └── 20221030-2.log
└── 31
├── 20221031-1.log
└── 20221031-2.log
各ログファイルはJSON形式のレコードが1行ずつ蓄積していく形です。
例
{"log_date":"2022-10-30","item1":"value11","item2":"value21","item3":"value31"}
{"log_date":"2022-10-30","item1":"value12","item2":"value22","item3":"value32"}
{"log_date":"2022-10-30","item1":"value13","item2":"value23","item3":"value33"}
(…以下略)
Athenaでテーブル作成
下記のような定義のテーブルを作成します。
これは”sample_db
に s3://sample-bucket/sample-logs/
とマッピングした samples
という名前のテーブルを作成する”というような内容となります。
カラム定義はlog_date
〜item3
ですがこれはログファイルのレコードとなるJSONの項目と一致します。
CREATE EXTERNAL TABLE sample_db.samples (
`log_date` STRING,
`item1` STRING,
`item2` STRING,
`item3` STRING
)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://sample-bucket/sample-logs/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.year.type' = 'integer',
'projection.year.range' = '2000,2200',
'projection.year.digits' = '4',
'projection.month.type' = 'integer',
'projection.month.range' = '1,12',
'projection.month.digits' = '2',
'projection.day.type' = 'integer',
'projection.day.range' = '1,31',
'projection.day.digits' = '2',
'storage.location.template' = 's3://sample-bucket/sample-logs/${year}/${month}/${day}'
);
PARTITIONED BY
TBLPROPERTIES
などが気になりますね。
これらはデータのパーティション分割のための定義となります。
Athenaの機能”データのパーティション分割”
クエリによってスキャンされるデータの量を制限できるようになるため、パフォーマンスが向上し、コストが削減されます。
実はAthenaはスキャンしたデータ量によって料金がかかってきます。(スキャンされたデータ 1 TB あたり 5.00USD)
上記サンプルログはパッと見た感じでそれほど気になりませんが、Photoructionの操作ログを記録するとなると恐ろしく大量のレコード数になることが予想されます。
そのため、あらかじめ任意のキーでデータをパーティションに分割し、クエリで「この条件でスキャン対象を絞って検索結果を返してね」という記述をしてスキャン量を抑えることができます。
テーブル上の実際のレコード
SELECT * FROM sample_db.samples
を実行するとこのような結果が得られます。
テーブル作成時のカラムには指定していなかったyear
month
day
が一緒に返ってきました。
# |
log_date |
item1 |
item2 |
item3 |
year |
month |
day |
1 |
2022-10-30 00:00:00 |
value11 |
value21 |
value31 |
2022 |
10 |
30 |
2 |
2022-10-30 00:00:00 |
value12 |
value22 |
value32 |
2022 |
10 |
30 |
3 |
2022-10-30 00:00:00 |
value13 |
value23 |
value33 |
2022 |
10 |
30 |
4 |
2022-10-30 00:00:00 |
value14 |
value24 |
value34 |
2022 |
10 |
30 |
5 |
2022-10-31 00:00:00 |
value11 |
value21 |
value31 |
2022 |
10 |
31 |
6 |
2022-10-31 00:00:00 |
value12 |
value22 |
value32 |
2022 |
10 |
31 |
7 |
2022-10-31 00:00:00 |
value13 |
value23 |
value33 |
2022 |
10 |
31 |
8 |
2022-10-31 00:00:00 |
value14 |
value24 |
value34 |
2022 |
10 |
31 |
つまり”PARTITIONED BY” “TBLPROPERTIES” とは
TBLPROPERTIES
はディレクトリとyear
month
day
のマッピング(と併せて型や範囲も)定義していることになります。
sample-bucket/sample-logs/2022/10/30/xxxxxx.log の場合は、データとしてyear=2022
month=10
day=30
となります。
またPARTITIONED BY
によって year
month
day
でパーティション分割するということになります。
というわけでSQLをSELECT * FROM sample_db.samples WHERE year = 2022 AND month = 10 day = 30
のようにすると、実際にはsample-bucket/sample-logs/2022/10/30/
以下のファイルのみスキャン対象とし、それ以外のディレクトリはスキャンしないため、その分スキャン量を抑えられる(=サービスの利用料金を抑えることができる)ということになります。
実装サンプル
それでは実際にAWSのSDK(PHP版)を使用してAthenaを検索してみます。
前提:composerでaws/aws-sdk-php
の3.x系
を利用
// ①クエリ
$query = 'SELECT * FROM sample_db.samples WHERE year = 2022 AND month = 10 AND day = 30';
// ②Athenaクライアント
$athenaClient = new Aws\Athena\AthenaClient([
'region' => 'ap-northeast-1',
'version' => 'latest',
'credentials' => [
'key' => 'SAMPLE_AWS_ACCESS_KEY_ID',
'secret' => 'SAMPLE_AWS_SECRET_ACCESS_KEY',
],
]);
// ③クエリ実行
$startQueryResponse = $athenaClient->startQueryExecution([
'QueryString' => $query,
'ResultConfiguration' => [
'OutputLocation' => 's3://sample-bucket/sample-results'
]
]);
// ④QueryExecutionId取得
$queryExecutionId = $startQueryResponse->get('QueryExecutionId');
for ($times=0; $times < 20; $times++) {
// ⑤QueryExecutionIdを元に実行ステータスを取得
$responseExecution = $athenaClient->getQueryExecution([
'QueryExecutionId' => $queryExecutionId
]);
$status = $responseExecution->get('QueryExecution')['Status']['State'];
// ⑥ステータスが「QUEUED」(受付済み)、「RUNNING」(実行中)だったら1秒待って繰り返す
if (in_array($status, ['QUEUED', 'RUNNING'])) {
sleep(1);
continue;
}
// ⑦ステータスが「SUCCEEDED」(成功)だったら結果を取得してみる
if ($status === 'SUCCEEDED') {
// ⑧QueryExecutionIdを元に結果セットを取得
$responseResults = $athenaClient->getQueryResults([
'QueryExecutionId' => $queryExecutionId
]);
$resultSet = $responseResults->get('ResultSet');
// meta情報
$meta = $resultSet['ResultSetMetadata']['ColumnInfo'];
// 検索結果
$rows = $resultSet['Rows'];
}
break;
}
AthenaをSDKを使用して検索する場合、非同期処理で行われるので、実行後に状態を監視して完了するのを待ってから結果を得る必要があります。
よって、
- SDKクライアントの初期化(②)
- クエリ実行し、結果取得用キー取得(③④)
- 結果取得用キーを使ってステータスをチェック(⑤)
- 「実行中」なら少し待って再度ステータスチェック(⑥)
- 「成功」なら結果取得用キーを使用して検索結果を取得する(⑦⑧)
というような流れになります。
また、③のOutputLocation
でログファイルを格納しているディレクトリとは別のS3ディレクトリを指定していますが、Athenaはクエリを実行して成功すると自動で結果セットのCSVをこのディレクトリに出力します。
ですので、例えば検索結果をフロントエンドで確認しつつクエリ結果CSVをダウンロードする機能なども容易に実装することが可能です。
最後に
いかがでしょうか?
今回ご紹介した機能はAthenaのごく一部の機能となり、実際には非常に多くの機能があります。
当然ですが実際にはAthena以外の箇所でログの出力方法・出力先、S3に転送するタイミングや手段などしっかり設計する必要があります。
いろいろと便利なサービスやツールを組み合わせて実装する機会はこれからも多々あるかと思います。
それらをうまく利用して作業時間の短縮やコストの削減を図り、本来時間をかけたい部分に注力してより良いプロダクトを作っていきたいと思います💪