Linux入門用機材およびPython学習環境としては非常に有効ではあったが、別にそれはARMで無くて良かった筈なのだ。
とりあえず「常時運転させても心が傷まないから」という理由でTumblr botとTwitter botがcron cronとしてゐる。
というわけで、Twitter botをPythonで作るまでを簡単にメモしておく。
あ、ちなみにPythonは2.7です。
まず日本語の文字列を単語に分解したいので、MeCabね。MeCab。どうやってインストールしたんだったか、もう思い出せないや。
確かラズパイ上でmakeしたんだったっけ。
スクリプト言語上から実行できるよう、すでにお膳立てされてる。
https://mecab.googlecode.com/svn/trunk/mecab/doc/bindings.html
import MeCab tagger = MeCab.Tagger('-Owakati') words = tagger.parse('メロスは激怒した').strip().split(' ') # words = ['メロス','は','激怒','し','た']といった具合です。
で、Twitter APIをどうやって叩こうか。オースとかもよく分からないし。綴りも分からないし。
でも、やはり既にモジュールがあった。pip! pip!
https://pypi.python.org/pypi/twitter
ただ、ドキュメントが若干古いのか、TwitterのURIが変更されたらしく、モジュールのドキュメントの通りに書いても上手く行かない。で、TwitterのAPIドキュメントを読みながら書いた。
https://dev.twitter.com/docs/api/1.1#109
つまり、"GET statuses/user_timeline"したかったら
import twitter twr = twitter.Twitter(auth=twitter.OAuth(OAUTH_TOKEN,OAUTH_SECRET,CONSUMER_KEY,CONSUMER_SECRET)) tl = twr.statuses.user_timeline(id='xxxxxxx') #tlはこの場合は連想配列の配列になる。本来帰ってくるJSONのキー名に準ずる。としてやる。ほかのURIも同様、Twitterオブジェクトのstatusesとかsearchといったメンバのメソッドを呼ぶかのようにしてAPIを叩くことができる。パラメータは全部、引数辞書(キーワード引数)で。
以上2つのモジュールで、やろうと思ってたことはだいたいできた。 botとしてやってることは、
- あらかじめ指定ユーザのツイートを取得してローカルに記録しておく
- 記録されているツイートをMeCabで分かち書きする
- 単語を一定の規則に従いつつランダムに並べ替える
- 並べ替えの過程で何らかのランダム度合いの点数化を行い、点数が低かったら3をやり直す
- できた文章をPOSTする
- 指定ユーザの新着ツイートを確認して、もしあれば記録
3番の説明が案外面倒だったので、以下にソースを貼ってしまおう。
#!/usr/bin/python # -*- coding: utf-8 -*- import MeCab import sys import string import random import math import re import twitter #cronから実行するときは絶対パス指定が必要らしい DIR = '/home/pi/Desktop/' #取得したツイートを記録するファイル CSV = 'tweets.csv' #適宜変更 OAUTH_TOKEN = "oauthtoken" OAUTH_SECRET = "oauthsecret" CONSUMER_KEY = "consumerkey" CONSUMER_SECRET = "consumersecret" #指定ユーザのタイムラインから特定のID範囲のツイートを取得する def get_tweets(twi, count, max_id=None, since_id=None): dic = {} dic['id'] = 'SCREEN_NAME' dic['exclude_replies'] = True #リプライは除外 dic['count'] = count if max_id: dic['max_id'] = max_id if since_id: dic['since_id'] = since_id tl = twi.statuses.user_timeline(**dic) print 'get_tweets() count:', len(tl) #IDをキーとする連想配列として出力 return {int(tweet['id']): tweet['text'] for tweet in tl} #CSVに記録したツイートを「分かち書き」に加工した上で取り出す def read_tl_log(tagger): log = {} with open(DIR+CSV, 'r') as f: for l in f: items = l.split(',') if len(items) > 1: i = int(items[0]) t = items[1] t = re.sub(r'\\n', r'\n', t) #ツイートのIDをキーとする log[i] = tagger.parse(t).strip().split(' ') print 'read_tl_log count:%d' % len(log) return log #全てのツイートについて3単語の連鎖を全て取得 def gen_chain(log): heads = [] #全ツイートの冒頭の連鎖 chains = [] #全ツイート中の全連鎖 for key in log: words = log[key] if len(words) > 0 and re.search(r'RT', words[0]): continue #ツイート中の全ての3単語の連鎖を記録 if len(words) > 2: for i in range(0, len(words)-2): chains.append( (words[i], words[i+1], words[i+2]) ) #2単語以下の連鎖は文末の目印につかう if len(words) > 1: chains.append( (words[-2], words[-1],) ) elif len(words) > 0: chains.append( (words[-1],) ) #ツイートの冒頭の連鎖を記録 if len(words) > 1: heads.append((words[0], words[1],)) elif len(words) > 0: heads.append((words[0],)) return chains, heads #開いてるカッコをスタックする、もしくは開いていないカッコを閉じないための処理 def check_bracket(w, stack): if re.search(r'「', w): stack.append(r'」') print 're.search(r\'「\', %s), len(stack)):%d' % (w, len(stack)) elif re.search(r'『', w): stack.append(r'』') print 're.search(r\'『\', %s), len(stack)):%d' % (w, len(stack)) elif re.search(r'(', w): stack.append(r')') print 're.search(r\'(\', %s), len(stack)):%d' % (w, len(stack)) elif re.search(r'【', w): stack.append(r'】') print 're.search(r\'【\', %s), len(stack)):%d' % (w, len(stack)) elif len(stack) > 0 and re.search(stack[-1], w): bracket = stack.pop() print 're.search(%s, %s), len(stack)):%d' % (bracket, w, len(stack)) elif re.search(r'」', w) or re.search(r'』', w) or re.search(r')', w) or re.search(r'】', w): print 'no stack and matches right bracket' return False return True #マルコフっぽいことをする def gen_markov(chains, heads): gen_text = '' begin = random.choice(heads) bra_stack = [] if len(begin) > 0: w0 = begin[0] if check_bracket(w0, bra_stack): gen_text += w0 else: return gen_text if len(begin) == 1: return gen_text w1 = begin[1] if check_bracket(w1, bra_stack): gen_text += w1 #連鎖の面白さを簡易的にスコア化するため単純なルールを設定した # 分岐の回数が多いほど高スコア # 分岐の枝数の平均値が高いほど高スコア # 枝の少ない分岐が続くほどスコアが下がる count = 1.0 branch = 1.0 while len(gen_text) < (120*3): sel = [] if len(bra_stack) > 0: sel = [t for t in chains if len(t)>2 and t[0]==w0 and t[1]==w1 and t[2]==bra_stack[-1]] print 'select (%s,%s,%s) : %d' % (w0, w1, bra_stack[-1], len(sel)) if len(sel) == 0: sel = [t for t in chains if len(t)>1 and t[0]==w0 and t[1]==w1] if len(sel) == 0: print 'len(sel) == 0: w0=%s, w1=%s' % (w0, w1) break print 'w0=%s,w1=%s,len(sel)=%d' % (w0,w1,len(sel)) achain = random.choice(sel) if len(achain) < 3: print 'len(achain) < 3', w0, w1 break branch *= (len(sel) * 0.5) w0 = achain[1] w1 = achain[2] if check_bracket(w1, bra_stack): gen_text += w1 count += 1 while len(gen_text) < (130*3) and len(bra_stack) > 0: gen_text += bra_stack.pop() print 'len=', len(gen_text) score = (count*branch)**(1.0/count) #この算出方法はテキトー print 'count=%f' % count print 'branch=%f' % branch print 'score=%f' % score print gen_text return gen_text, score #ツイートをPOSTする def post_text(twi, text): dic = {} dic['status'] = text #なんとなく適当なジオタグもつけてみる dic['lat'] = -90.0 + 180.0*random.random() dic['long'] = -180.0 + 360.0*random.random() twi.statuses.update(**dic) #main処理 if __name__ == '__main__': try: t = twitter.Twitter( auth=twitter.OAuth(OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET) ) print 'MeCab.VERSION ', MeCab.VERSION tagger = MeCab.Tagger("-Owakati") log = read_tl_log(tagger) chains, head_chains = gen_chain(log) print 'len(chains):%d' % len(chains) print 'len(head_chains):%d' % len(head_chains) score = 1.0 while score < 1.4: #スコアが低かったらやり直し(閾値はテキトー) gen_text, score = gen_markov(chains, head_chains) #ツイートする post_text(t, gen_text) if len(log) > 0: max_id = max(log) else: max_id = 0 print 'max_id:',max_id #取得した最大IDのツイートよりも新しいツイートを全て取得 all_texts = {} texts = get_tweets(t, 200, None, max_id) while len(texts) > 0: max_id = max(texts) print 'max_id:',max_id all_texts.update(texts) texts = get_tweets(t, 200, None, max_id+1) except twitter.TwitterError, e: print "TwitterError:", e try: #取得した新着ツイートをCSVに追記する with open(DIR+CSV, 'a+') as f: print 'append all_texts len:%d' % len(all_texts) for key in all_texts: text = all_texts[key] text = re.sub(u'\r', r'', text) text = re.sub(u'\n', r'\\n', text) data = (u'%d,%s\n' % (key, text)).encode('utf8') f.write(data) except RuntimeError, e: print "RuntimeError:", e except UnicodeDecodeError, e: print "UnicodeDecodeError:", eこんな具合になってます。
0 件のコメント:
コメントを投稿