我使用 LinqToTwitter 创建了一个 observable 集合IObservable<Tweet>
,如下所示。问题是当我处理第一个可观察对象并订阅新的可观察对象时,此实现存在并发问题。
如何正确处理第一个 observable?
(下面的示例应该是完整的并且可以正常工作,只需添加引用的包和 Twitter 凭据。)
这是发生此问题的示例:
using System;
using System.Reactive.Linq;
namespace Twitter.Cli
{
class Program
{
public static void Main(string[] args)
{
var twitter = new TwitterApi.Twitter();
var search1 = twitter.AllTweetsAbout("windows")
.Sample(TimeSpan.FromSeconds(1));
var search2 = twitter.AllTweetsAbout("android")
.Sample(TimeSpan.FromSeconds(1));
var sub = search1.Subscribe(
x =>
Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));
Console.ReadLine();
sub.Dispose();
/*
* If you stop the processing here for a while so that the StartAsync method can be executed
* within the closure everything works fine because disposed is set to true
* before the second observable is created
*/
//Console.ReadLine();
search2.Subscribe(
x =>
Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));
Console.ReadLine();
}
}
}
如果在StartAsync
创建第二个 observable 之前执行第一个 observable 创建的闭包中的方法,那么disposed
将设置为true
并且一切都很好。
但是如果第二个 observable 是在下一次执行第一个闭包之前创建的,StartAsync
disposed
则再次将其设置为 false 并且s.CloseStream();
永远不会被调用。
下面是 observable 的创建:
using System;
using System.Configuration;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using LinqToTwitter;
namespace TwitterApi
{
public class Twitter
{
private readonly SingleUserAuthorizer _auth = new SingleUserAuthorizer
{
CredentialStore = new InMemoryCredentialStore
{
ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
OAuthToken = ConfigurationManager.AppSettings["authtoken"],
OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
}
};
private readonly TwitterContext _twitterCtx;
public Twitter()
{
if (String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerKey)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerSecret)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthToken)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthTokenSecret))
throw new Exception("User Credentials are not set. Please update your App.config file.");
_twitterCtx = new TwitterContext(_auth);
}
public IObservable<Tweet> AllTweetsAbout(string topic)
{
return Observable.Create<Tweet>(o =>
{
var query = from s in _twitterCtx.Streaming
where s.Type == StreamingType.Filter &&
s.Track == topic
select s;
var disposed = false;
query.StartAsync(async s =>
{
if (disposed)
s.CloseStream();
else
{
Tweet t;
if (Tweet.TryParse(s.Content, topic, out t))
{
o.OnNext(t);
}
}
});
return Disposable.Create(() => disposed = true);
});
}
}
}
最后是Tweet
课程:
using System;
using Newtonsoft.Json.Linq;
namespace TwitterApi
{
public class Tweet
{
public string User { get; private set; }
public string Text { get; private set; }
public string Topic { get; private set; }
public static bool TryParse(string json, string topic, out Tweet tweet)
{
try
{
dynamic parsed = JObject.Parse(json);
tweet = new Tweet
{
User = parsed.user.screen_name,
Text = parsed.text,
Topic = topic,
};
return true;
}
catch (Exception)
{
tweet = null;
return false;
}
}
private Tweet()
{
}
}
}