Gmail メッセージを全部取得して集計する

Gmail メッセージを全部取得して集計する
Photo by Paula Hayes / Unsplash

Gmail にメールがたくさん溜まっているので多い順に削除すれば結構容量空くんじゃないか?と思い全メールアドレスを集計しようと思いました、が、Gmail にそのような機能 (受信数多い順に集計する等) はなかったので Gmail API で自前で集計してみました。

最初は Google Apps Script でやろうとしたのですが、スクリプトの実行時間 6 分という制限があり、数万件のメールを集計するのは不可能だったので、最終的には Python でスクリプトを作成しました。

Gmail API を使用する準備

事前に GCP にログインして、 Gmail API の有効化、OAuth 同意画面の作成、認証情報の作成を行う必要があります。

認証情報の作成時に credentials.json が取得できるのでこれを使用してトークンを取得し、トークンを使用して Gmail API を呼び出す、という流れになります。

この辺の流れは以下のクイックスタートを実行するだけです。

Python のクイックスタート | Gmail | Google for Developers

Gmail メッセージを全部取得して集計するスクリプトを作成

要点は以下です。

  • users().messages().list() を使用してメッセージのリスト (メッセージ ID 等) を取得
    💡 一度に最大で 500 メッセージまでしか取得できないので、next token から次のメッセージを取得していきますが、この時 list_next() を使用することで自分でページングを実装する手間が省けます
  • メッセージ ID を元に users().messages().get() を使用してメッセージの詳細 (From メールアドレスやタイトル、本文等) を取得
  • [{"<From メールアドレス>": <出現回数>}, ...] な配列と From メールアドレスを突き合わせて、合致する場合は <出現回数> をインクリメントしていく
  • 配列を <出現回数> でソートして、トップ 10 など適当なところで出力

また、API を呼び出す部分はエラー時に再試行するようにしています。

retry という便利なライブラリがあるのでこれを使わせてもらっています。

再試行しないと、途中で処理が終わってしまい絶望します (数万件のメールの取得に数時間かかるので)。

エラー発生時点でトラフィックがなくなっている(処理が落ちている)
エラーが複数回発生してもトラフィックがなくならない(処理を継続)

コード全体は以下です。GItHub にも手順含め公開しています。


from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from collections import defaultdict
from retry import retry
import sys
import os
import re
SCOPES = [
    "https://www.googleapis.com/auth/gmail.readonly",
]
TOKEN_FILE_NAME = "token.json"
EMAIL_ADDRESS = sys.argv[0]

def creds():
    creds = None
    if os.path.exists("token.json"):
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        with open("token.json", 'w') as token:
            token.write(creds.to_json())
    return creds

def extract_email_addresses(text):
    match = re.search(r'<(.+?)>', text)
    if match:
        email = match.group(1)
        return email
    else:
        # マッチしない場合「"サンプル(EXAMPLE)" <info@example.com>」のような形式ではなく
        # info@example.com のようにアドレスのみの場合と判断。雑だけど。
        return text

def sort_email_counts(email_counts, top):
    print(f"\nTOP {top} は以下です。")
    sorded_email_counts_items = sorted(
        email_counts.items(), key=lambda x: x[1], reverse=True)
    for email, count in sorded_email_counts_items[:top]:
        print(f"{email}: {count}")

@retry(tries=3, delay=2)
def get_email_list(request):
    results = request.execute()
    return results

@retry(tries=3, delay=2)
def get_email_detail(service, message_id):
    txt = service.users().messages().get(
        userId='me', id=message_id).execute()
    return txt

def main():
    try:
        c = creds()
        service = build('gmail', 'v1', credentials=c)
        max_results = 500  # 最大 500
        email_counts = defaultdict(int)
        consumed_token = 0
        request = service.users().messages().list(
            userId='me', maxResults=max_results)
        while request is not None:
            results = get_email_list(request)
            consumed_token += max_results
            print(f"\n現時点で {consumed_token} 個のリストを取得しました。")
            messages = results.get('messages')
            for msg in messages:
                txt = get_email_detail(service, msg['id'])
                payload = txt['payload']
                headers = payload['headers']
                for d in headers:
                    if d['name'] == 'From':
                        sender = d['value']
                        sender_email_address = extract_email_addresses(sender)
                        email_counts[sender_email_address] += 1
                print("From: ", sender_email_address)
            # # 検証用に適当なところで切り上げるためのコード
            # if consumed_token > 19:
            #     break
            print("\n現時点の集計結果です。")
            sort_email_counts(email_counts, 10)
            request = service.users().messages().list_next(request, results)
        # 最終的にはトップ 50 を出す。
        sort_email_counts(email_counts, 50)
        return "end"
    except HttpError as error:
        print(f'An error occurred: {error}')

if __name__ == "__main__":
    main()

実行イメージ

以下のように集計したい Gmail アカウントのメールアドレスを指定して実行します。

  • $ python main.py <Gmail アドレス>

実行イメージはこんな感じです。取得した From メールアドレスを逐一出力しながら進んでいきます。ページング毎に取得した From メールアドレスのその時点での集計結果上位 10 を出力しつつ、最終的に集計が完了したら上位 50 を出力します。


$ python main.py <Gmail アドレス>

現時点で 500 個のリストを取得しました。
From:  <メールアドレス色々>
From:  <メールアドレス色々>
From:  <メールアドレス色々>

...

現時点の集計結果です。

TOP 10 は以下です。
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>

...

現時点で 33500 個のリストを取得しました。
From:  <メールアドレス色々>
From:  <メールアドレス色々>
From:  <メールアドレス色々>

...

現時点の集計結果です。

TOP 10 は以下です。
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>

...

TOP 50 は以下です。
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>
<メールアドレス色々>: <多い順にソートされた集計結果>
...

感想

実行から数時間かかりますが、上位 50 のランクをもとに多い順に削除を行ない、上位 5 くらいまでで数百 MB 空きが出来ました。 🎉

去年くらいに手動で頑張って削除して数 GB 空けたのですが、その時このスクリプトがあれば捗ったな…。