2014年3月2日日曜日

Pythonでtwitterの自動相互フォロー

botに自動返信機能を付けようと思った。
どうせならメッセージ内容から適当な単語を抽出して、人工無能的なリプライ内容にしたかった。
と思ったら、タイムラインを非公開にしている不届きな輩がいたため、botアカウントからは返信されたことにも気づけない。
やむなくbotに「自動フォロー返し」を実装することになった。

## 引数はTwitterオブジェクト
def check_friendship(twi):
    dic = {}
    # BOTからフォロー申請中のIDを列挙
    outgoing_ids = set(twi.friendships.outgoing(**dic)['ids'])
    print 'friendships/outgoing:', outgoing_ids
    # BOTがフォローしているIDを列挙
    dic['screen_name'] = BOT_NAME
    following_ids = set(twi.friends.ids(**dic)['ids'])
    print 'friends/ids:', following_ids
    # BOTをフォローしているIDを列挙
    followers_ids = set(twi.followers.ids(**dic)['ids'])
    print 'followers/ids:', followers_ids
    # BOTをフォローしており、BOTがフォローを申請していないIDを抽出
    diff = followers_ids.difference(following_ids,outgoing_ids)
    print 'difference', diff
    # フォロー申請を出す
    for i in diff:
        dic = {}
        dic['user_id'] = i
        twi.friendships.create(**dic)

とりあえず、TwitterのREST API名であるoutgoingとcreateが機能を連想しにくい。
あと、pythonのset型に対する集合操作が強力で驚いた。

2014年2月11日火曜日

RaspberryPiでtwitterのbotを作ったこと

RaspberryPiを買って3,4ヶ月くらい経ったけど、未だにユカイツーカイな用途を見出だせていない。
Linux入門用機材およびPython学習環境としては非常に有効ではあったが、別にそれはARMで無くて良かった筈なのだ。

とりあえず「常時運転させても心が傷まないから」という理由でTumblr botとTwitter botがcron cronとしてゐる。
というわけで、Twitter botをPythonで作るまでを簡単にメモしておく。
あ、ちなみにPythonは2.7です。

まず日本語の文字列を単語に分解したいので、MeCabね。MeCab。どうやってインストールしたんだったか、もう思い出せないや。
確かラズパイ上でmakeしたんだったっけ。
スクリプト言語上から実行できるよう、すでにお膳立てされてる。
https://mecab.googlecode.com/svn/trunk/mecab/doc/bindings.html
import MeCab
tagger = MeCab.Tagger('-Owakati')
words = tagger.parse('メロスは激怒した').strip().split(' ')
# words = ['メロス','は','激怒','し','た']
といった具合です。

で、Twitter APIをどうやって叩こうか。オースとかもよく分からないし。綴りも分からないし。
でも、やはり既にモジュールがあった。pip! pip!
https://pypi.python.org/pypi/twitter
ただ、ドキュメントが若干古いのか、TwitterのURIが変更されたらしく、モジュールのドキュメントの通りに書いても上手く行かない。で、TwitterのAPIドキュメントを読みながら書いた。
https://dev.twitter.com/docs/api/1.1#109
つまり、"GET statuses/user_timeline"したかったら
import twitter
twr = twitter.Twitter(auth=twitter.OAuth(OAUTH_TOKEN,OAUTH_SECRET,CONSUMER_KEY,CONSUMER_SECRET))
tl = twr.statuses.user_timeline(id='xxxxxxx')
#tlはこの場合は連想配列の配列になる。本来帰ってくるJSONのキー名に準ずる。
としてやる。ほかのURIも同様、Twitterオブジェクトのstatusesとかsearchといったメンバのメソッドを呼ぶかのようにしてAPIを叩くことができる。パラメータは全部、引数辞書(キーワード引数)で。

以上2つのモジュールで、やろうと思ってたことはだいたいできた。 botとしてやってることは、
  1. あらかじめ指定ユーザのツイートを取得してローカルに記録しておく
  2. 記録されているツイートをMeCabで分かち書きする
  3. 単語を一定の規則に従いつつランダムに並べ替える
  4. 並べ替えの過程で何らかのランダム度合いの点数化を行い、点数が低かったら3をやり直す
  5. できた文章をPOSTする
  6. 指定ユーザの新着ツイートを確認して、もしあれば記録
といった流れ。
3番の説明が案外面倒だったので、以下にソースを貼ってしまおう。
#!/usr/bin/python
# -*- coding: utf-8 -*-

import MeCab
import sys
import string
import random
import math
import re
import twitter

#cronから実行するときは絶対パス指定が必要らしい
DIR = '/home/pi/Desktop/'
#取得したツイートを記録するファイル
CSV = 'tweets.csv'
#適宜変更
OAUTH_TOKEN = "oauthtoken"
OAUTH_SECRET = "oauthsecret"
CONSUMER_KEY = "consumerkey"
CONSUMER_SECRET = "consumersecret"

#指定ユーザのタイムラインから特定のID範囲のツイートを取得する
def get_tweets(twi, count, max_id=None, since_id=None):
    dic = {}
    dic['id'] = 'SCREEN_NAME'
    dic['exclude_replies'] = True #リプライは除外
    dic['count'] = count
    if max_id:
        dic['max_id'] = max_id
    if since_id:
        dic['since_id'] = since_id
    tl = twi.statuses.user_timeline(**dic)
    print 'get_tweets() count:', len(tl)
    #IDをキーとする連想配列として出力
    return {int(tweet['id']): tweet['text'] for tweet in tl}

#CSVに記録したツイートを「分かち書き」に加工した上で取り出す
def read_tl_log(tagger):
    log = {}
    with open(DIR+CSV, 'r') as f:
        for l in f:
            items = l.split(',')
            if len(items) > 1:
                i = int(items[0])
                t = items[1]
                t = re.sub(r'\\n', r'\n', t)
                #ツイートのIDをキーとする
                log[i] = tagger.parse(t).strip().split(' ')
    print 'read_tl_log count:%d' % len(log)
    return log

#全てのツイートについて3単語の連鎖を全て取得
def gen_chain(log):
    heads = [] #全ツイートの冒頭の連鎖
    chains = [] #全ツイート中の全連鎖
    for key in log:
        words = log[key]
        if len(words) > 0 and re.search(r'RT', words[0]):
            continue
        #ツイート中の全ての3単語の連鎖を記録
        if len(words) > 2:
            for i in range(0, len(words)-2):
                chains.append( (words[i], words[i+1], words[i+2]) )
        #2単語以下の連鎖は文末の目印につかう
        if len(words) > 1:
            chains.append( (words[-2], words[-1],) )
        elif len(words) > 0:
            chains.append( (words[-1],) )
        #ツイートの冒頭の連鎖を記録
        if len(words) > 1:
            heads.append((words[0], words[1],))
        elif len(words) > 0:
            heads.append((words[0],))
    return chains, heads

#開いてるカッコをスタックする、もしくは開いていないカッコを閉じないための処理
def check_bracket(w, stack):
    if re.search(r'「', w):
        stack.append(r'」')
        print 're.search(r\'「\', %s), len(stack)):%d' % (w, len(stack))
    elif re.search(r'『', w):
        stack.append(r'』')
        print 're.search(r\'『\', %s), len(stack)):%d' % (w, len(stack))
    elif re.search(r'(', w):
        stack.append(r')')
        print 're.search(r\'(\', %s), len(stack)):%d' % (w, len(stack))
    elif re.search(r'【', w):
        stack.append(r'】')
        print 're.search(r\'【\', %s), len(stack)):%d' % (w, len(stack))
    elif len(stack) > 0 and re.search(stack[-1], w):
        bracket = stack.pop()
        print 're.search(%s, %s), len(stack)):%d' % (bracket, w, len(stack))
    elif re.search(r'」', w) or re.search(r'』', w) or re.search(r')', w) or re.search(r'】', w):
        print 'no stack and matches right bracket'
        return False
    return True
               
#マルコフっぽいことをする 
def gen_markov(chains, heads):
    gen_text = ''
    begin = random.choice(heads)
    bra_stack = []
    if len(begin) > 0:
        w0 = begin[0]
        if check_bracket(w0, bra_stack):
            gen_text += w0
    else:
        return gen_text
        
    if len(begin) == 1:
        return gen_text
        
    w1 = begin[1]
    if check_bracket(w1, bra_stack):
        gen_text += w1
    #連鎖の面白さを簡易的にスコア化するため単純なルールを設定した
    #  分岐の回数が多いほど高スコア
    #  分岐の枝数の平均値が高いほど高スコア
    #  枝の少ない分岐が続くほどスコアが下がる
    count = 1.0
    branch = 1.0
    while len(gen_text) < (120*3):
        sel = []
        if len(bra_stack) > 0:
            sel = [t for t in chains if len(t)>2 and t[0]==w0 and t[1]==w1 and t[2]==bra_stack[-1]]
            print 'select (%s,%s,%s) : %d' % (w0, w1, bra_stack[-1], len(sel))
        if len(sel) == 0:
            sel = [t for t in chains if len(t)>1 and t[0]==w0 and t[1]==w1]
        if len(sel) == 0:
            print 'len(sel) == 0: w0=%s, w1=%s' % (w0, w1)
            break
        print 'w0=%s,w1=%s,len(sel)=%d' % (w0,w1,len(sel))
        achain = random.choice(sel)
        if len(achain) < 3:
            print 'len(achain) < 3', w0, w1
            break
        branch *= (len(sel) * 0.5)
        w0 = achain[1]
        w1 = achain[2]
        if check_bracket(w1, bra_stack):
            gen_text += w1
        count += 1
    while len(gen_text) < (130*3) and len(bra_stack) > 0:
        gen_text += bra_stack.pop()
    print 'len=', len(gen_text)
    score = (count*branch)**(1.0/count) #この算出方法はテキトー
    print 'count=%f' % count
    print 'branch=%f' % branch
    print 'score=%f' % score
    print gen_text
    return gen_text, score

#ツイートをPOSTする
def post_text(twi, text):
    dic = {}
    dic['status'] = text
    #なんとなく適当なジオタグもつけてみる
    dic['lat'] = -90.0 + 180.0*random.random()
    dic['long'] = -180.0 + 360.0*random.random()
    twi.statuses.update(**dic)

#main処理
if __name__ == '__main__':
    try:
        t = twitter.Twitter(
            auth=twitter.OAuth(OAUTH_TOKEN, OAUTH_SECRET,
            CONSUMER_KEY, CONSUMER_SECRET)
             )
        print 'MeCab.VERSION ', MeCab.VERSION
        tagger = MeCab.Tagger("-Owakati")
        log = read_tl_log(tagger)
        chains, head_chains = gen_chain(log)
        print 'len(chains):%d' % len(chains)
        print 'len(head_chains):%d' % len(head_chains)
        score = 1.0
        while score < 1.4: #スコアが低かったらやり直し(閾値はテキトー)
            gen_text, score = gen_markov(chains, head_chains)
        #ツイートする
        post_text(t, gen_text)
        
        if len(log) > 0:
            max_id = max(log)
        else:
            max_id = 0
        print 'max_id:',max_id
        #取得した最大IDのツイートよりも新しいツイートを全て取得
        all_texts = {}
        texts = get_tweets(t, 200, None, max_id)
        while len(texts) > 0:
            max_id = max(texts)
            print 'max_id:',max_id
            all_texts.update(texts)
            texts = get_tweets(t, 200, None, max_id+1)
    except twitter.TwitterError, e:
        print "TwitterError:", e
        
    try:
        #取得した新着ツイートをCSVに追記する
        with open(DIR+CSV, 'a+') as f:
            print 'append all_texts len:%d' % len(all_texts)
            for key in all_texts:
                text = all_texts[key]
                text = re.sub(u'\r', r'', text)
                text = re.sub(u'\n', r'\\n', text)
                data = (u'%d,%s\n' % (key, text)).encode('utf8')
                f.write(data)
    except RuntimeError, e:
        print "RuntimeError:", e
    except UnicodeDecodeError, e:
        print "UnicodeDecodeError:", e
こんな具合になってます。