どうも、とがみんです。
前回の記事で、AWSのマネジメントコンソールからぽちぽちリソースを作成していきました。
この記事ではServerless Frameworkを活用して、Amazon Kinesis Data StreamからLambdaを通してデータを取得し、CloudWatchに出力する簡易アプリケーションを作成してみます。
今回作成するもの
今回作成するものは、前回コンソールから作成したものと同じで、 ローカルPCからKinesisにデータを送信し、Kinesisでの受信をトリガーにLambdaが起動し、CloudWatchにログが出力されるというものです。
作成手順概要
- Serverless Frameworkを活用したプロジェクトの作成
- コードの作成
- デプロイと動作確認
作成手順詳細
Serverless Frameworkを活用したプロジェクトの作成
Serverless Frameworkを活用したプロジェクトを作成します。
作業ディレクトリに移動し、下記のコマンドを実行して、Serverlessアプリケーションの雛形を作成します。
今回はPythonを使用します。
コードの作成とデプロイ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | service: serverless-kinesis frameworkVersion: '3' provider: name: aws runtime: python3.8 region: ap-northeast-1 functions: hello: handler: handler.helloStream role: !GetAtt KinesisLambdaTestRole.Arn events: - stream: type: kinesis arn: !GetAtt TestStream.Arn batchSize: 1 startingPosition: LATEST enabled: true resources: Resources: TestStream: Type: AWS::Kinesis::Stream Properties: Name: testStream ShardCount: 1 KinesisLambdaTestRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole |
functions
呼び出す関数はhandler.helloStreamで、resourcesに記載の権限を付与したロールをLambda関数にアタッチします。
Lambda関数のイベントトリガーはKinesisに設定し、Kinesisからのデータ取得単位であるbatchSizeは1にしています。
StartingPositionは、Kinesisのデータの読み取りを開始する場所を最新に指定しています。
イベントソースとのマッピングを有効化するために、enabledはtrueにしています。
TestStream
Kinesis Data Streamに関するコードを記述しています。
シャード数は1に設定しています。
KinesisLambdaTestRole
Lambda関数がKinesisに対するアクセスや、CloudWatchに対してログを書き込んだりする権限を付与します。
AWSによってデフォルトで準備されているAWSLambdaKinesisExecutionRoleのポリシーを付与しています。
1 2 3 4 5 6 7 | import json def helloStream(event, context): for record in event['Records']: payload = record['kinesis']['data'] print("payload: " + payload) return 'Successfully processed {} records.'.format(len(event['Records'])) |
関数の処理内容としては、Kinesisへのデータ取得をトリガーに、上記関数が動作し、取得したデータをログに出力するというシンプルな処理になります。
デプロイと動作確認
コードが記載できたら下記のコマンドを実行し、デプロイします。
デプロイが完了すると、KinesisやLambda関数が作成されていることが確認できます。
下記のコマンドを実行し、Kinesisにデータを入れて動作を確認します。
投入したデータがCloudWatchのログに出力されていることが確認できました。