RaspberryPiを買って3,4ヶ月くらい経ったけど、未だにユカイツーカイな用途を見出だせていない。
Linux入門用機材およびPython学習環境としては非常に有効ではあったが、別にそれはARMで無くて良かった筈なのだ。
とりあえず「常時運転させても心が傷まないから」という理由でTumblr botとTwitter botがcron cronとしてゐる。
というわけで、Twitter botをPythonで作るまでを簡単にメモしておく。
あ、ちなみにPythonは2.7です。
まず日本語の文字列を単語に分解したいので、MeCabね。MeCab。どうやってインストールしたんだったか、もう思い出せないや。
確かラズパイ上でmakeしたんだったっけ。
スクリプト言語上から実行できるよう、すでにお膳立てされてる。
https://mecab.googlecode.com/svn/trunk/mecab/doc/bindings.html
import MeCab
tagger = MeCab.Tagger('-Owakati')
words = tagger.parse('メロスは激怒した').strip().split(' ')
# words = ['メロス','は','激怒','し','た']
といった具合です。
で、Twitter APIをどうやって叩こうか。オースとかもよく分からないし。綴りも分からないし。
でも、やはり既にモジュールがあった。pip! pip!
https://pypi.python.org/pypi/twitter
ただ、ドキュメントが若干古いのか、TwitterのURIが変更されたらしく、モジュールのドキュメントの通りに書いても上手く行かない。で、TwitterのAPIドキュメントを読みながら書いた。
https://dev.twitter.com/docs/api/1.1#109
つまり、"GET statuses/user_timeline"したかったら
import twitter
twr = twitter.Twitter(auth=twitter.OAuth(OAUTH_TOKEN,OAUTH_SECRET,CONSUMER_KEY,CONSUMER_SECRET))
tl = twr.statuses.user_timeline(id='xxxxxxx')
#tlはこの場合は連想配列の配列になる。本来帰ってくるJSONのキー名に準ずる。
としてやる。ほかのURIも同様、Twitterオブジェクトのstatusesとかsearchといったメンバのメソッドを呼ぶかのようにしてAPIを叩くことができる。パラメータは全部、引数辞書(キーワード引数)で。
以上2つのモジュールで、やろうと思ってたことはだいたいできた。
botとしてやってることは、
- あらかじめ指定ユーザのツイートを取得してローカルに記録しておく
- 記録されているツイートをMeCabで分かち書きする
- 単語を一定の規則に従いつつランダムに並べ替える
- 並べ替えの過程で何らかのランダム度合いの点数化を行い、点数が低かったら3をやり直す
- できた文章をPOSTする
- 指定ユーザの新着ツイートを確認して、もしあれば記録
といった流れ。
3番の説明が案外面倒だったので、以下にソースを貼ってしまおう。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import MeCab
import sys
import string
import random
import math
import re
import twitter
#cronから実行するときは絶対パス指定が必要らしい
DIR = '/home/pi/Desktop/'
#取得したツイートを記録するファイル
CSV = 'tweets.csv'
#適宜変更
OAUTH_TOKEN = "oauthtoken"
OAUTH_SECRET = "oauthsecret"
CONSUMER_KEY = "consumerkey"
CONSUMER_SECRET = "consumersecret"
#指定ユーザのタイムラインから特定のID範囲のツイートを取得する
def get_tweets(twi, count, max_id=None, since_id=None):
dic = {}
dic['id'] = 'SCREEN_NAME'
dic['exclude_replies'] = True #リプライは除外
dic['count'] = count
if max_id:
dic['max_id'] = max_id
if since_id:
dic['since_id'] = since_id
tl = twi.statuses.user_timeline(**dic)
print 'get_tweets() count:', len(tl)
#IDをキーとする連想配列として出力
return {int(tweet['id']): tweet['text'] for tweet in tl}
#CSVに記録したツイートを「分かち書き」に加工した上で取り出す
def read_tl_log(tagger):
log = {}
with open(DIR+CSV, 'r') as f:
for l in f:
items = l.split(',')
if len(items) > 1:
i = int(items[0])
t = items[1]
t = re.sub(r'\\n', r'\n', t)
#ツイートのIDをキーとする
log[i] = tagger.parse(t).strip().split(' ')
print 'read_tl_log count:%d' % len(log)
return log
#全てのツイートについて3単語の連鎖を全て取得
def gen_chain(log):
heads = [] #全ツイートの冒頭の連鎖
chains = [] #全ツイート中の全連鎖
for key in log:
words = log[key]
if len(words) > 0 and re.search(r'RT', words[0]):
continue
#ツイート中の全ての3単語の連鎖を記録
if len(words) > 2:
for i in range(0, len(words)-2):
chains.append( (words[i], words[i+1], words[i+2]) )
#2単語以下の連鎖は文末の目印につかう
if len(words) > 1:
chains.append( (words[-2], words[-1],) )
elif len(words) > 0:
chains.append( (words[-1],) )
#ツイートの冒頭の連鎖を記録
if len(words) > 1:
heads.append((words[0], words[1],))
elif len(words) > 0:
heads.append((words[0],))
return chains, heads
#開いてるカッコをスタックする、もしくは開いていないカッコを閉じないための処理
def check_bracket(w, stack):
if re.search(r'「', w):
stack.append(r'」')
print 're.search(r\'「\', %s), len(stack)):%d' % (w, len(stack))
elif re.search(r'『', w):
stack.append(r'』')
print 're.search(r\'『\', %s), len(stack)):%d' % (w, len(stack))
elif re.search(r'(', w):
stack.append(r')')
print 're.search(r\'(\', %s), len(stack)):%d' % (w, len(stack))
elif re.search(r'【', w):
stack.append(r'】')
print 're.search(r\'【\', %s), len(stack)):%d' % (w, len(stack))
elif len(stack) > 0 and re.search(stack[-1], w):
bracket = stack.pop()
print 're.search(%s, %s), len(stack)):%d' % (bracket, w, len(stack))
elif re.search(r'」', w) or re.search(r'』', w) or re.search(r')', w) or re.search(r'】', w):
print 'no stack and matches right bracket'
return False
return True
#マルコフっぽいことをする
def gen_markov(chains, heads):
gen_text = ''
begin = random.choice(heads)
bra_stack = []
if len(begin) > 0:
w0 = begin[0]
if check_bracket(w0, bra_stack):
gen_text += w0
else:
return gen_text
if len(begin) == 1:
return gen_text
w1 = begin[1]
if check_bracket(w1, bra_stack):
gen_text += w1
#連鎖の面白さを簡易的にスコア化するため単純なルールを設定した
# 分岐の回数が多いほど高スコア
# 分岐の枝数の平均値が高いほど高スコア
# 枝の少ない分岐が続くほどスコアが下がる
count = 1.0
branch = 1.0
while len(gen_text) < (120*3):
sel = []
if len(bra_stack) > 0:
sel = [t for t in chains if len(t)>2 and t[0]==w0 and t[1]==w1 and t[2]==bra_stack[-1]]
print 'select (%s,%s,%s) : %d' % (w0, w1, bra_stack[-1], len(sel))
if len(sel) == 0:
sel = [t for t in chains if len(t)>1 and t[0]==w0 and t[1]==w1]
if len(sel) == 0:
print 'len(sel) == 0: w0=%s, w1=%s' % (w0, w1)
break
print 'w0=%s,w1=%s,len(sel)=%d' % (w0,w1,len(sel))
achain = random.choice(sel)
if len(achain) < 3:
print 'len(achain) < 3', w0, w1
break
branch *= (len(sel) * 0.5)
w0 = achain[1]
w1 = achain[2]
if check_bracket(w1, bra_stack):
gen_text += w1
count += 1
while len(gen_text) < (130*3) and len(bra_stack) > 0:
gen_text += bra_stack.pop()
print 'len=', len(gen_text)
score = (count*branch)**(1.0/count) #この算出方法はテキトー
print 'count=%f' % count
print 'branch=%f' % branch
print 'score=%f' % score
print gen_text
return gen_text, score
#ツイートをPOSTする
def post_text(twi, text):
dic = {}
dic['status'] = text
#なんとなく適当なジオタグもつけてみる
dic['lat'] = -90.0 + 180.0*random.random()
dic['long'] = -180.0 + 360.0*random.random()
twi.statuses.update(**dic)
#main処理
if __name__ == '__main__':
try:
t = twitter.Twitter(
auth=twitter.OAuth(OAUTH_TOKEN, OAUTH_SECRET,
CONSUMER_KEY, CONSUMER_SECRET)
)
print 'MeCab.VERSION ', MeCab.VERSION
tagger = MeCab.Tagger("-Owakati")
log = read_tl_log(tagger)
chains, head_chains = gen_chain(log)
print 'len(chains):%d' % len(chains)
print 'len(head_chains):%d' % len(head_chains)
score = 1.0
while score < 1.4: #スコアが低かったらやり直し(閾値はテキトー)
gen_text, score = gen_markov(chains, head_chains)
#ツイートする
post_text(t, gen_text)
if len(log) > 0:
max_id = max(log)
else:
max_id = 0
print 'max_id:',max_id
#取得した最大IDのツイートよりも新しいツイートを全て取得
all_texts = {}
texts = get_tweets(t, 200, None, max_id)
while len(texts) > 0:
max_id = max(texts)
print 'max_id:',max_id
all_texts.update(texts)
texts = get_tweets(t, 200, None, max_id+1)
except twitter.TwitterError, e:
print "TwitterError:", e
try:
#取得した新着ツイートをCSVに追記する
with open(DIR+CSV, 'a+') as f:
print 'append all_texts len:%d' % len(all_texts)
for key in all_texts:
text = all_texts[key]
text = re.sub(u'\r', r'', text)
text = re.sub(u'\n', r'\\n', text)
data = (u'%d,%s\n' % (key, text)).encode('utf8')
f.write(data)
except RuntimeError, e:
print "RuntimeError:", e
except UnicodeDecodeError, e:
print "UnicodeDecodeError:", e
こんな具合になってます。