1

该程序是用 C# 编写的。它从事件中心依次获取数据,并进入 sql db。

为了提高性能,我使 sql 插入异步。

在我的本地机器 8 GB 内存和 4 核中,异步版本的代码执行速度比同步版本快。

在开发服务器/环境中(单核,1.75 GB)。异步版本的执行速度比同步版本慢。

我扩大了开发服务器/环境(2 核,3.5 GB)。异步版本有所改进,但仍然比同步版本慢。

我的理解不考虑 CPU 内核和内存大小,异步版本应该比同步版本执行得更好。

你对此有何看法?

代码片段

// 将为从事件中心拉出的每个事件调用 ManageGeoSpetialData 方法。

public class EventConsumer : IEventConsumer
{
    private static readonly ILog4NetService Log = new Log4NetService(MethodBase.GetCurrentMethod().DeclaringType);
    private readonly IConsumerFactory _consumerFactory;
    private readonly IPublisherFactory _publisherFactory;
    private readonly IDataAccessLayer _dataAccess;
    private IPublisher _ehPublisher;

    public EventConsumer(IConsumerFactory consumerFactory, IPublisherFactory publisherFactory, IDataAccessLayer dataAccess)
    {
        _consumerFactory = consumerFactory;
        _publisherFactory = publisherFactory;
        _dataAccess = dataAccess;
    }

    public void Process()
    {
        Log.Info("CheckPointLogic Process Called ");
        try
        {
            ManageGeoSpetialLogic("Eventhub","Eventhub");
        }
        catch (Exception ex)
        {
            Log.Error("Error in CheckPointLogic Process Method : " + ex.Message);
        }
    }

    private void ManageGeoSpetialLogic(string consumerName, string publisherName)
    {
        Log.Info("Manage CheckPointLogic Called with Consumer : " + consumerName);
        _ehPublisher = _publisherFactory.Get(publisherName);
        var consumer = _consumerFactory.Get(consumerName);
        consumer.Consume(ManageGeoSpetialData);
        Log.Info("Consumer Method called ");
    }

    public async void ManageGeoSpetialData(object data)
    {
            try
            {
                _ehPublisher = _ehPublisher ?? _publisherFactory.Get(Apps.Terra.Messaging.Publisher.Constants.PublisherTypes.Eventhub);
                GeoBoundaryEvent geoBoundaryInfo = null;
                object transactionID = new object();
                if (data is EventData eventInfo)
                {
                    var value = Encoding.UTF8.GetString(eventInfo.Body.ToArray());
                    geoBoundaryInfo = JsonConvert.DeserializeObject<GeoBoundaryEvent>(value);
                    geoBoundaryInfo.SetEventType();
                    eventInfo.Properties.TryGetValue(EventProperties.TransactionIdProperty, out transactionID);
                }

                if (geoBoundaryInfo != null)
                {
                    geoBoundaryInfo.AssetLocationTimestamp = DateTime.ParseExact(geoBoundaryInfo.AssetLocationTimestamp.ToString("yyyy-MM-dd HH:mm:ss.fff"), "yyyy-MM-dd HH:mm:ss.fff", null);
                    geoBoundaryInfo.AssetLocationTimestamp = DateTime.SpecifyKind(geoBoundaryInfo.AssetLocationTimestamp, DateTimeKind.Utc);

                    // geoBoundaryInfo.AssetGeofenceID = _dataAccess.AddGeofenceEventInfo(geoBoundaryInfo); //Db Call

                    geoBoundaryInfo.AssetGeofenceID = await _dataAccess.AddGeofenceEventInfoAsync(geoBoundaryInfo);

                    EventData eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(geoBoundaryInfo)));
                    _ehPublisher.Publish(eventData);   //EH publish
                }
            }
            catch (Exception ex)
            {
                Log.Error($"Error in ManageGeoSpetialData {ex.Message}");
            }
    }
}


//Business layer method
public async Task<long> AddGeofenceEventInfoAsync(GeoBoundaryEvent geoBoundaryEvent)
    {
        //Need to change query for update
        var query = @"INSERT INTO AssetGeofence(AssetType,AssetDSN,AssetLocationDatetime,AlertDateTime,GeoBoundaryAssetType,GeoBoundaryAssetDSN,fk_EventTypeID,GeoFenceName,GeoFenceID,IsActive) Output Inserted.AssetGeofenceID values 
                     (@AssetType,@AssetDeviceSerialNumber,@AssetLocationTimestamp,@AlertTimestamp,@GeoBoundaryAssetType,@GeoBoundaryAssetDeviceSerialNumber,@fk_EventTypeID,@GeoFenceName,@GeoFenceId,1);";
        var task = _dbConnection.ExecuteQueryAsync<long>(query, geoBoundaryEvent);
        var result = await task;
        return result[0];


    }

//Data layer method
    public async Task<List<T>> ExecuteQueryAsync<T>(string sql, object param)
    {
         using (var connection = GetConnection())
         {
            connection.Open();
            var task = connection.QueryAsync<T>(sql, param, commandTimeout: 100);
            await task;
            var result = task.GetAwaiter().GetResult().ToList();
            connection.Close();
            return result;
        }
    }
4

0 回答 0