public event EventHandler<PostDeletedEventArgs> PostDeleted;
public event EventHandler<UserStreamEventReceivedEventArgs> UserStreamEventReceived;
private DateTimeUtc _lastUserstreamDataReceived;
- private TwitterUserstream userStream;
+ private StreamAutoConnector userStreamConnector;
public class FormattedEvent
{
public bool IsUserstreamDataReceived
=> (DateTimeUtc.Now - this._lastUserstreamDataReceived).TotalSeconds < 31;
- private void userStream_StatusArrived(ITwitterStreamMessage message)
+ private void userStream_MessageReceived(ITwitterStreamMessage message)
{
this._lastUserstreamDataReceived = DateTimeUtc.Now;
=> this.UserStreamStopped?.Invoke(this, EventArgs.Empty);
public bool UserStreamActive
- => this.userStream != null && this.userStream.IsStreamActive;
+ => this.userStreamConnector != null && this.userStreamConnector.IsStreamActive;
public void StartUserStream()
{
- var newStream = new TwitterUserstream(this.Api);
+ var replies = this.AllAtReply ? "all" : null;
+ var streamObservable = this.Api.UserStreams(replies, this.TrackWord);
+ var newConnector = new StreamAutoConnector(streamObservable);
- newStream.StatusArrived += userStream_StatusArrived;
- newStream.Started += userStream_Started;
- newStream.Stopped += userStream_Stopped;
+ newConnector.MessageReceived += userStream_MessageReceived;
+ newConnector.Started += userStream_Started;
+ newConnector.Stopped += userStream_Stopped;
- newStream.Start(this.AllAtReply, this.TrackWord);
+ newConnector.Start();
- var oldStream = Interlocked.Exchange(ref this.userStream, newStream);
- oldStream?.Dispose();
+ var oldConnector = Interlocked.Exchange(ref this.userStreamConnector, newConnector);
+ oldConnector?.Dispose();
}
public void StopUserStream()
{
- var oldStream = Interlocked.Exchange(ref this.userStream, null);
- oldStream?.Dispose();
+ var oldConnector = Interlocked.Exchange(ref this.userStreamConnector, null);
+ oldConnector?.Dispose();
}
public void ReconnectUserStream()
{
- if (this.userStream != null)
+ if (this.userStreamConnector != null)
{
this.StartUserStream();
}
}
- private class TwitterUserstream : IDisposable
+ private class StreamAutoConnector : IDisposable
{
- public bool AllAtReplies { get; private set; }
- public string TrackWords { get; private set; }
+ private readonly TwitterStreamObservable streamObservable;
public bool IsStreamActive { get; private set; }
+ public bool IsDisposed { get; private set; }
- public event Action<ITwitterStreamMessage> StatusArrived;
+ public event Action<ITwitterStreamMessage> MessageReceived;
public event Action Stopped;
public event Action Started;
- private TwitterApi twitterApi;
-
private Task streamTask;
- private CancellationTokenSource streamCts;
+ private CancellationTokenSource streamCts = new CancellationTokenSource();
- public TwitterUserstream(TwitterApi twitterApi)
- => this.twitterApi = twitterApi;
+ public StreamAutoConnector(TwitterStreamObservable streamObservable)
+ => this.streamObservable = streamObservable;
- public void Start(bool allAtReplies, string trackwords)
+ public void Start()
{
- this.AllAtReplies = allAtReplies;
- this.TrackWords = trackwords;
-
var cts = new CancellationTokenSource();
this.streamCts = cts;
{
try
{
- await this.UserStreamLoop(cts.Token)
+ await this.StreamLoop(cts.Token)
.ConfigureAwait(false);
}
catch (OperationCanceledException) { }
this.Stopped?.Invoke();
}
- private async Task UserStreamLoop(CancellationToken cancellationToken)
+ private async Task StreamLoop(CancellationToken cancellationToken)
{
TimeSpan sleep = TimeSpan.Zero;
- for (;;)
+ for (; ; )
{
if (sleep != TimeSpan.Zero)
{
try
{
- var replies = this.AllAtReplies ? "all" : null;
-
- var observable = this.twitterApi.UserStreams(replies, this.TrackWords);
-
- await observable.ForEachAsync(x => this.StatusArrived?.Invoke(x), cancellationToken);
+ await this.streamObservable.ForEachAsync(
+ x => this.MessageReceived?.Invoke(x),
+ cancellationToken);
// キャンセルされていないのにストリームが終了した場合
sleep = TimeSpan.FromSeconds(30);
}
}
- private bool disposed = false;
-
public void Dispose()
{
- if (this.disposed)
+ if (this.IsDisposed)
return;
- this.disposed = true;
+ this.IsDisposed = true;
this.Stop();
this.Started = null;
this.Stopped = null;
- this.StatusArrived = null;
+ this.MessageReceived = null;
}
}
#endregion