我正在尝试找到一种将对象传递到 Azure 队列的方法。我找不到办法做到这一点。
正如我所见,我可以传递字符串或字节数组,这对于传递对象来说不是很舒服。
反正有没有将自定义对象传递给队列?
谢谢!
我正在尝试找到一种将对象传递到 Azure 队列的方法。我找不到办法做到这一点。
正如我所见,我可以传递字符串或字节数组,这对于传递对象来说不是很舒服。
反正有没有将自定义对象传递给队列?
谢谢!
您可以使用以下类作为示例:
[Serializable]
public abstract class BaseMessage
{
public byte[] ToBinary()
{
BinaryFormatter bf = new BinaryFormatter();
byte[] output = null;
using (MemoryStream ms = new MemoryStream())
{
ms.Position = 0;
bf.Serialize(ms, this);
output = ms.GetBuffer();
}
return output;
}
public static T FromMessage<T>(CloudQueueMessage m)
{
byte[] buffer = m.AsBytes;
T returnValue = default(T);
using (MemoryStream ms = new MemoryStream(buffer))
{
ms.Position = 0;
BinaryFormatter bf = new BinaryFormatter();
returnValue = (T)bf.Deserialize(ms);
}
return returnValue;
}
}
然后是一个 StdQueue(一个强类型的队列):
public class StdQueue<T> where T : BaseMessage, new()
{
protected CloudQueue queue;
public StdQueue(CloudQueue queue)
{
this.queue = queue;
}
public void AddMessage(T message)
{
CloudQueueMessage msg =
new CloudQueueMessage(message.ToBinary());
queue.AddMessage(msg);
}
public void DeleteMessage(CloudQueueMessage msg)
{
queue.DeleteMessage(msg);
}
public CloudQueueMessage GetMessage()
{
return queue.GetMessage(TimeSpan.FromSeconds(120));
}
}
然后,你所要做的就是继承BaseMessage:
[Serializable]
public class ParseTaskMessage : BaseMessage
{
public Guid TaskId { get; set; }
public string BlobReferenceString { get; set; }
public DateTime TimeRequested { get; set; }
}
并创建一个与该消息一起使用的队列:
CloudStorageAccount acc;
if (!CloudStorageAccount.TryParse(connectionString, out acc))
{
throw new ArgumentOutOfRangeException("connectionString", "Invalid connection string was introduced!");
}
CloudQueueClient clnt = acc.CreateCloudQueueClient();
CloudQueue queue = clnt.GetQueueReference(processQueue);
queue.CreateIfNotExist();
this._queue = new StdQueue<ParseTaskMessage>(queue);
希望这可以帮助!
使用 Newtonsoft.Json 和 async 的扩展方法
public static async Task AddMessageAsJsonAsync<T>(this CloudQueue cloudQueue, T objectToAdd)
{
var messageAsJson = JsonConvert.SerializeObject(objectToAdd);
var cloudQueueMessage = new CloudQueueMessage(messageAsJson);
await cloudQueue.AddMessageAsync(cloudQueueMessage);
}
我喜欢这种泛化方法,但我不喜欢将 Serialize 属性放在我可能想要放入消息中的所有类上并从基类派生它们(我可能也已经有一个基类)所以我使用...
using System;
using System.Text;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
namespace Example.Queue
{
public static class CloudQueueMessageExtensions
{
public static CloudQueueMessage Serialize(Object o)
{
var stringBuilder = new StringBuilder();
stringBuilder.Append(o.GetType().FullName);
stringBuilder.Append(':');
stringBuilder.Append(JsonConvert.SerializeObject(o));
return new CloudQueueMessage(stringBuilder.ToString());
}
public static T Deserialize<T>(this CloudQueueMessage m)
{
int indexOf = m.AsString.IndexOf(':');
if (indexOf <= 0)
throw new Exception(string.Format("Cannot deserialize into object of type {0}",
typeof (T).FullName));
string typeName = m.AsString.Substring(0, indexOf);
string json = m.AsString.Substring(indexOf + 1);
if (typeName != typeof (T).FullName)
{
throw new Exception(string.Format("Cannot deserialize object of type {0} into one of type {1}",
typeName,
typeof (T).FullName));
}
return JsonConvert.DeserializeObject<T>(json);
}
}
}
例如
var myobject = new MyObject();
_queue.AddMessage( CloudQueueMessageExtensions.Serialize(myobject));
var myobject = _queue.GetMessage().Deserialize<MyObject>();
如果存储队列与 WebJob 或 Azure 函数一起使用(非常常见的场景),那么当前的 Azure SDK 允许直接使用 POCO 对象。请参阅此处的示例:
注意:SDK 会在后台自动使用 Newtonsoft.Json 进行序列化/反序列化。
我喜欢@Akodo_Shado 使用Newtonsoft.Json
. 我对其进行了更新,Azure.Storage.Queues
还添加了一个“检索和删除”方法,用于从队列中反序列化对象。
public static class CloudQueueExtensions
{
public static async Task AddMessageAsJsonAsync<T>(this QueueClient queueClient, T objectToAdd) where T : class
{
string messageAsJson = JsonConvert.SerializeObject(objectToAdd);
BinaryData cloudQueueMessage = new BinaryData(messageAsJson);
await queueClient.SendMessageAsync(cloudQueueMessage);
}
public static async Task<T> RetreiveAndDeleteMessageAsObjectAsync<T>(this QueueClient queueClient) where T : class
{
QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync(1);
if (retrievedMessage.Length == 0) return null;
string theMessage = retrievedMessage[0].MessageText;
T instanceOfT = JsonConvert.DeserializeObject<T>(theMessage);
await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
return instanceOfT;
}
}
RetreiveAndDeleteMessageAsObjectAsync
旨在一次处理 1 条消息,但您显然可以重写以反序列化完整的消息数组并返回一个或ICollection<T>
类似的。
这不是正确的做法。队列不用于存储对象。您需要将对象放在 blob 或表中(序列化)。我相信 queue messgae body 有 64kb 大小限制,sdk1.5 和 8kb 较低版本。Messgae 的主体是为只接收它的工人传输关键数据。