bigquery-pythonでcreate_dataset, create_tableが動かない
bigquery-pythonを使って、ツイッターのデータをビッグクエリにインポートしようとしているのですができません。
どうやら、create_tableができていないみたいです。問題の部分だけ抽出すると以下のコードです。
import bigquery
# 認証情報はcredentials.jsonから取得。
# グーグルクラウドプラットフォームでサービスアカウントを作成して、「新しい秘密鍵の提供」から取得。
client = bigquery.get_client(json_key_file='credentials.json', readonly=False)
client.create_dataset(dataset_id='tiwtter') # Falseが返ってくる。datasetは作成されていない。
環境:
OS :Ubuntu 14.04.5 LTS
python:3.6.4
conda :4.3.34
bigquery:1.14.0
念の為以下にコードの全文を記載します。
import os
import sys
from datetime import timezone
import tweepy
import bigquery
import pandas as pd
# Twitterの認証情報を読み込む。
twitter_api = pd.read_csv('api_twitter.csv', names=['name', 'value'], header=None)
twitter_api
CONSUMER_KEY = twitter_api.loc[0, 'value']
CONSUMER_SECRET = twitter_api.loc[1, 'value']
ACCESS_TOKEN = twitter_api.loc[2, 'value']
ACCESS_TOKEN_SECRET = twitter_api.loc[3, 'value']
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# BigQueryの認証情報(credentials.json)を指定してBigQueryのクライアントを作成する。
# 明示的にreadonly=Falseとしないと書き込みができない。
client = bigquery.get_client(json_key_file='credentials.json', readonly=False)
DATASET_NAME = 'twitter' # BigQueryのデータ・セット名
TABLE_NAME = 'tweets' # BigQueryのテーブル名
# テーブルが存在しない場合は作成する。
if not client.check_table(DATASET_NAME, TABLE_NAME):
print('Creating table {0}.{1}'.format(DATASET_NAME, TABLE_NAME), file=sys.stderr)
# create_table()の第三引数にはスキーマを指定する。
client.create_table(DATASET_NAME, TABLE_NAME, [
{'name': 'id', 'type': 'string', 'description': 'ツイートのID'},
{'name': 'lang', 'type': 'string', 'description': 'ツイートの言語'},
{'name': 'screen_name', 'type': 'string', 'description': 'ユーザー名'},
{'name': 'text', 'type': 'string', 'description': 'ツイートの本文'},
{'name': 'created_at', 'type': 'timestamp', 'description': 'ツイートの日時'},
])
class MyStreamListener(tweepy.streaming.StreamListener):
"""
Streaming APIで取得したツイートを処理するクラス
"""
status_list = []
num_imported = 0
def on_status(self, status):
"""
ツイートを受信した時に呼び出されるメソッド。
引数: ツイートを表すStatusオブジェクト
"""
self.status_list.append(status) # Statusオブジェクトをstatus_listに追加する。
if len(self.status_list) >= 500:
# status_listに500件溜まったらBigQueryにインポートする。
if not push_to_bigquery(self.status_list):
# インポートに失敗した場合はFalseが帰ってくるのでエラーを出して終了する。
print('Failed to send to bigquery', file=sys.stderr)
return False
# num_importedを増やして、status_listを空にする。
self.num_imported += len(self.status_list)
self.status_list = []
print('Imported {0} rows'.format(self.num_imported), file=sys.stderr)
# 料金が高額にならないように、5000件をインポートしたらFalseを返して終了する。
# 継続的にインポートしたいときは次の2行をコメントアウトしてください。
if self.num_imported >= 5000:
return False
def push_to_bigquery(status_list):
"""
ツイートのリストをBigQueryにインポートする。
"""
# TweepyのStatusオブジェクトのリストからdictのリストに変換する。
rows = []
for status in status_list:
rows.append({
'id': status.id_str,
'lang': status.lang,
'screen_name': status.author.screen_name,
'text': status.text,
# datetimeオブジェクトをUTCのPOSIXタイムスタンプに変換する。
'created_at': status.created_at.replace(tzinfo=timezone.utc).timestamp()
})
# dictのリストをBigQueryにインポートする。
# 引数は順に、データ・セット名、テーブル名、行のリスト、行を一意に識別する列名を表す。
# insert_id_keyはエラーでリトライした時に重複しないようにするために使われるが、必須ではない。
return client.push_rows(DATASET_NAME, TABLE_NAME, rows, insert_id_key='id')
# Stream APIの読み込みを開始する。
print('Collecting tweets...', file=sys.stderr)
stream = tweepy.Stream(auth, MyStreamListener())
# 公開されているツイートをサンプリングしたストリームを受信する。
# 言語を指定していないので、あるゆる言語のツイートを取得できる。
stream.sample()