1 package twitter.manage;
\r
3 import java.io.IOException;
\r
4 import java.util.HashMap;
\r
5 import java.util.HashSet;
\r
6 import java.util.Map;
\r
7 import java.util.Set;
\r
9 import java.util.logging.Level;
\r
10 import java.util.logging.Logger;
\r
11 import twitter.action.streaming.TweetStreamingListener;
\r
12 import twitter4j.ConnectionLifeCycleListener;
\r
13 import twitter4j.DirectMessage;
\r
14 import twitter4j.FilterQuery;
\r
15 import twitter4j.Status;
\r
16 import twitter4j.StatusAdapter;
\r
17 import twitter4j.StatusDeletionNotice;
\r
18 import twitter4j.StatusListener;
\r
19 import twitter4j.StatusStream;
\r
20 import twitter4j.TwitterException;
\r
21 import twitter4j.TwitterStream;
\r
22 import twitter4j.TwitterStreamFactory;
\r
23 import twitter4j.User;
\r
24 import twitter4j.UserStreamAdapter;
\r
25 import twitter4j.auth.AccessToken;
\r
28 * 指定したキーワードをサーチする際に利用
\r
32 public class TweetSearchStream extends StatusAdapter implements Runnable, ConnectionLifeCycleListener{
\r
34 private TwitterStream twitterStream = null;
\r
36 private FilterQuery filter = null;
\r
38 private StatusStream statusStream = null;
\r
40 private Thread workingThread = null;
\r
42 private TweetManager tweetManager = null;
\r
44 private Map<String, TweetStreamingListener> listeners = null;
\r
46 private Map<Long, TweetStreamingListener> userListener = null;
\r
48 private Map<String, Long> lastUpdate = null;
\r
50 private Map<Long, Long> userLastUpdate = null;
\r
51 //streaming開始を行うかどうか
\r
52 private boolean isStarted = false;
\r
56 * @param consumerKey
\r
57 * @param consumerSecret
\r
58 * @param ac アクセストークン
\r
59 * @param tweetManager
\r
61 public TweetSearchStream(String consumerKey, String consumerSecret, AccessToken ac, TweetManager tweetManager) {
\r
62 this.tweetManager = tweetManager;
\r
63 this.twitterStream = new TwitterStreamFactory().getInstance();
\r
64 this.twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
\r
65 this.twitterStream.setOAuthAccessToken(ac);
\r
66 this.twitterStream.addConnectionLifeCycleListener(this);
\r
68 filter = new FilterQuery();
\r
69 listeners = new HashMap<String, TweetStreamingListener>();
\r
70 userListener = new HashMap<Long, TweetStreamingListener>();
\r
71 lastUpdate = new HashMap<String, Long>();
\r
72 userLastUpdate = new HashMap<Long, Long>();
\r
78 public void start() {
\r
79 this.isStarted = true;
\r
86 public void stop() {
\r
87 this.isStarted = false;
\r
88 this.twitterStream.cleanUp();
\r
89 if( this.statusStream != null ) {
\r
91 this.statusStream.close();
\r
92 } catch (IOException ex) {
\r
93 Logger.getLogger(TweetSearchStream.class.getName()).log(Level.SEVERE, null, ex);
\r
103 public void addSearchWord(String word, TweetStreamingListener listener) {
\r
104 listeners.put(word, listener);
\r
112 public void removeSearchWord(String word) {
\r
113 listeners.remove(word);
\r
122 public void addSearchUser(Long userid, TweetStreamingListener listener) {
\r
123 userListener.put(userid, listener);
\r
131 public void removeSearchUser(Long userid) {
\r
132 userListener.remove(userid);
\r
139 private void updateFilter() {
\r
140 if( this.isStarted ) {
\r
141 //指定したユーザの情報を取得するようにする
\r
142 Long[] users = userListener.keySet().toArray(new Long[0]);
\r
143 if( users != null ) {
\r
144 long[] usersLong = new long[users.length];
\r
145 for(int i=0; i < users.length; i++) {
\r
146 usersLong[i] = users[i];
\r
148 filter.follow(usersLong);
\r
150 //指定したワードの情報を取得するようにする
\r
151 String[] words = listeners.keySet().toArray(new String[0]);
\r
152 filter.track(words);
\r
153 workingThread = new Thread(this);
\r
154 workingThread.start();
\r
159 *指定したワードに対応するステータスを取得
\r
162 public void onStatus(Status status) {
\r
164 Set<String> keys = listeners.keySet();
\r
165 synchronized (listeners) {
\r
166 for(String word : keys) {
\r
167 if( status.getText().contains( word.toString() ) ) {
\r
168 TweetStreamingListener listener = listeners.get(word);
\r
169 listener.update(status);
\r
171 lastUpdate.put(word, status.getId());
\r
177 Set<Long> userKeys = userListener.keySet();
\r
178 synchronized(userListener) {
\r
179 for(long id : userKeys) {
\r
180 if( status.getUser().getId() == id ) {
\r
181 TweetStreamingListener listener = userListener.get(id);
\r
182 listener.update(status);
\r
184 userLastUpdate.put(id, status.getId());
\r
195 public long getLastUpdateID(String word) {
\r
196 Long id = lastUpdate.get(word);
\r
204 * ユーザの最終更新ステータスidの取得
\r
208 public long getUserLastUpdateID(long userid) {
\r
209 Long id = userLastUpdate.get(userid);
\r
220 public void onException(Exception ex) {
\r
221 ex.printStackTrace();
\r
225 public void run() {
\r
228 if( statusStream != null ) {
\r
229 //前回開いていたものがあったら閉じる
\r
230 statusStream.close();
\r
232 } catch (IOException e) {
\r
233 e.printStackTrace();
\r
235 statusStream = twitterStream.getFilterStream(filter);
\r
236 } catch (TwitterException e) {
\r
237 e.printStackTrace();
\r
239 for(; statusStream != null; ) {
\r
241 statusStream.next(this);
\r
242 }catch(Exception e) {
\r
243 e.printStackTrace();
\r
250 public void onConnect() {
\r
251 System.out.println("search started");
\r
255 public void onDisconnect() {
\r
256 System.out.println("search stopped");
\r
260 public void onCleanUp() {
\r