7

我的目标是从数据源中检索数据,向其中添加一些元数据并将其插入另一个目标。

目标的架构比源多四列(计算列)。

我正在使用SqlBulkCopy,它需要一个具有所有列(包括计算的 4 个)的阅读器。

有没有办法手动向 DataReader 添加列?或者如果不可能,我有什么替代数据插入?

4

5 回答 5

2

DataReader 是只读结构,因此无法修改。您可以改用DataTable

于 2013-08-29T06:55:19.697 回答
2

有可能的

  • 创建您自己的实现 IDataReader 接口的类
  • 在类构造函数中添加现有的 DataReader
  • 根据需要覆盖接口以从 Base DataReader 返回结果或返回您自己的计算值

只是想知道,这可能是一个简单的实现(我跳过了大多数方法)

public class WrapperDataReader : IDataReader
{
    private IDataReader reader;

    public WrapperDataReader(IDataReader reader)
    {
        this.reader = reader;
    }

    public void Close()
    {
        reader.Close();
    }

    public int Depth
    {
        get { return reader.Depth; }
    }

    public DataTable GetSchemaTable()
    {
        var schemaTable = reader.GetSchemaTable();
        // add your computed column to the schema table
        schemaTable.Rows.Add(...);
        return schemaTable;
    }

    public bool GetBoolean(int i)
    {
        return reader.GetBoolean(i);
    }

    public int GetOrdinal(string name)
    {
        if (name.Equals("displayName", StringComparison.InvariantCultureIgnoreCase))
            return 15;
        return reader.GetOrdinal(name);
    }

    public string GetString(int i)
    {
        if (i == 15)
            return String.Format("{0}, {1}", GetString(1), GetString(2)); // lastname, firstname
        return reader.GetString(i);
    }

}

更新

由于您正在使用 WriteToServer 方法,因此您可以使用采用 DataTable 的重载。

        var connectionString = "...";
        var copy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.Default);
        copy.DestinationTableName = "Customers";

        var reader = new SqlDataReader();
        var table = new DataTable();
        table.Load(reader);
        table.Columns.Add("DisplayName", typeof(string), "lastname, firstname");
        table.Columns.Add("CustomerCode", typeof(string));

        foreach (DataRow row in table.Rows)
            row["CustomerCode"] = ((int)row["id"] + 10000).ToString();

        copy.WriteToServer(table);
于 2013-08-29T06:56:33.237 回答
1

Datareader 仅用于读取数据。您不能修改其架构或值

Dataset/DataTable 就是为此目的。

于 2013-08-29T06:55:51.357 回答
1

我需要这样做,并且还能够根据其他列创建列,并使列的值取决于读取器的行索引。如果这对您有用(它对我有用),则此类IWrappedReader从 Dapper 实现。该类不完整,因为我没有实现所有IDataRecord字段,但您可以查看我是如何IDataRecord.GetInt32看到简单模式的。

/// <inheritdoc />
public class WrappedDataReader : IDataReader
{
    private readonly IList<AdditionalField> _additionalFields;
    private readonly int _originalOrdinalCount;
    private IDbCommand _cmd;
    private int _currentRowIndex = -1; //The first Read() will make this 0
    private IDataReader _underlyingReader;

    public WrappedDataReader(IDataReader underlyingReader, IList<AdditionalField> additionalFields)
    {
        _additionalFields = additionalFields;
        _underlyingReader = underlyingReader;

        var schema = Reader.GetSchemaTable();
        if (schema == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }

        _originalOrdinalCount = schema.Rows.Count;
    }

    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }

    public IDataReader Reader
    {
        get
        {
            if (_underlyingReader == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            return _underlyingReader;
        }
    }

    IDbCommand IWrappedDataReader.Command
    {
        get
        {
            if (_cmd == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            return _cmd;
        }
    }

    void IDataReader.Close() => _underlyingReader?.Close();

    int IDataReader.Depth => Reader.Depth;

    DataTable IDataReader.GetSchemaTable()
    {
        var rv = Reader.GetSchemaTable();
        if (rv == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }

        for (var i = 0; i < _additionalFields.Count; i++)
        {
            var row = rv.NewRow();

            row["ColumnName"] = _additionalFields[i].ColumnName;
            row["ColumnOrdinal"] = GetAppendColumnOrdinal(i);
            row["DataType"] = _additionalFields[i].DataType;

            rv.Rows.Add(row);
        }

        return rv;
    }

    bool IDataReader.IsClosed => _underlyingReader?.IsClosed ?? true;

    bool IDataReader.NextResult() => Reader.NextResult();

    bool IDataReader.Read()
    {
        _currentRowIndex++;
        return Reader.Read();
    }

    int IDataReader.RecordsAffected => Reader.RecordsAffected;

    void IDisposable.Dispose()
    {
        _underlyingReader?.Close();
        _underlyingReader?.Dispose();
        _underlyingReader = null;
        _cmd?.Dispose();
        _cmd = null;
    }

    int IDataRecord.FieldCount => Reader.FieldCount + _additionalFields.Count;

    bool IDataRecord.GetBoolean(int i) => Reader.GetBoolean(i);

    byte IDataRecord.GetByte(int i) => Reader.GetByte(i);

    long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) =>
        Reader.GetBytes(i, fieldOffset, buffer, bufferoffset, length);

    char IDataRecord.GetChar(int i) => Reader.GetChar(i);

    long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) =>
        Reader.GetChars(i, fieldoffset, buffer, bufferoffset, length);

    IDataReader IDataRecord.GetData(int i) => Reader.GetData(i);

    string IDataRecord.GetDataTypeName(int i) => Reader.GetDataTypeName(i);

    DateTime IDataRecord.GetDateTime(int i) => Reader.GetDateTime(i);

    decimal IDataRecord.GetDecimal(int i) => Reader.GetDecimal(i);

    double IDataRecord.GetDouble(int i) => Reader.GetDouble(i);

    Type IDataRecord.GetFieldType(int i) => Reader.GetFieldType(i);

    float IDataRecord.GetFloat(int i) => Reader.GetFloat(i);

    Guid IDataRecord.GetGuid(int i) => Reader.GetGuid(i);

    short IDataRecord.GetInt16(int i) => Reader.GetInt16(i);

    int IDataRecord.GetInt32(int i)
    {
        return i >= _originalOrdinalCount ? (int) ExecuteAdditionalFieldFunc(i) : Reader.GetInt32(i);
    }

    long IDataRecord.GetInt64(int i) => Reader.GetInt64(i);

    string IDataRecord.GetName(int i)
    {
        return i >= _originalOrdinalCount ? _additionalFields[GetAppendColumnIndex(i)].ColumnName : Reader.GetName(i);
    }

    int IDataRecord.GetOrdinal(string name)
    {
        for (var i = 0; i < _additionalFields.Count; i++)
        {
            if (name.Equals(_additionalFields[i].ColumnName, StringComparison.OrdinalIgnoreCase))
            {
                return GetAppendColumnOrdinal(i);
            }
        }

        return Reader.GetOrdinal(name);
    }

    string IDataRecord.GetString(int i) => Reader.GetString(i);

    object IDataRecord.GetValue(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) : Reader.GetValue(i);
    }

    int IDataRecord.GetValues(object[] values) => Reader.GetValues(values);

    bool IDataRecord.IsDBNull(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) == null : Reader.IsDBNull(i);
    }

    object IDataRecord.this[string name]
    {
        get
        {
            var ordinal = ((IDataRecord) this).GetOrdinal(name);
            return ((IDataRecord) this).GetValue(ordinal);
        }
    }


    object IDataRecord.this[int i] => ((IDataRecord) this).GetValue(i);

    private int GetAppendColumnOrdinal(int index)
    {
        return _originalOrdinalCount + index;
    }

    private int GetAppendColumnIndex(int oridinal)
    {
        return oridinal - _originalOrdinalCount;
    }

    private object ExecuteAdditionalFieldFunc(int oridinal)
    {
        return _additionalFields[GetAppendColumnIndex(oridinal)].Func(_currentRowIndex, Reader);
    }

    public struct AdditionalField
    {
        public AdditionalField(string columnName, Type dataType, Func<int, IDataReader, object> func = null)
        {
            ColumnName = columnName;
            DataType = dataType;
            Func = func;
        }

        public string ColumnName;
        public Type DataType;
        public Func<int, IDataReader, object> Func;
    }
}

这是我写的一个快速 NUnit 测试来展示它的用法

[Test]
public void ReaderTest()
{
    using (var conn = new SqlConnection(ConnectionSettingsCollection.Default))
    {
        conn.Open();
        const string sql = @"
            SELECT 1 as OriginalField
            UNION
            SELECT -500 as OriginalField
            UNION
            SELECT 100 as OriginalField
        ";

        var additionalFields = new[]
        {
            new WrappedDataReader.AdditionalField("StaticField", typeof(int), delegate { return "X"; }),
            new WrappedDataReader.AdditionalField("CounterField", typeof(int), (i, reader) => i),
            new WrappedDataReader.AdditionalField("ComputedField", typeof(int), (i, reader) => (int) reader["OriginalField"] + 1000)
        };

        const string expectedJson = @"
            [
                {""OriginalField"":-500,""StaticField"":""X"",""CounterField"":0,""ComputedField"":500},
                {""OriginalField"":1,   ""StaticField"":""X"",""CounterField"":1,""ComputedField"":1001},
                {""OriginalField"":100, ""StaticField"":""X"",""CounterField"":2,""ComputedField"":1100}
            ]
        ";

        var actualJson = ToJson(new WrappedDataReader(new SqlCommand(sql, conn).ExecuteReader(), additionalFields));

        Assert.Zero(CultureInfo.InvariantCulture.CompareInfo.Compare(expectedJson, actualJson, CompareOptions.IgnoreSymbols));
    }
}

private static string ToJson(IDataReader reader)
{
    using (var strWriter = new StringWriter(new StringBuilder()))
    using (var jsonWriter = new JsonTextWriter(strWriter))
    {
        jsonWriter.WriteStartArray();

        while (reader.Read())
        {
            jsonWriter.WriteStartObject();

            for (var i = 0; i < reader.FieldCount; i++)
            {
                jsonWriter.WritePropertyName(reader.GetName(i));
                jsonWriter.WriteValue(reader[i]);
            }

            jsonWriter.WriteEndObject();
        }

        jsonWriter.WriteEndArray();

        return strWriter.ToString();
    }
}
于 2018-09-10T05:06:56.397 回答
0

我找到了另一种可能的解决方案,我目前正在使用:

  1. 从现有的对象中创建以下对象DataReader
    IEnumerable<object[]>- 代表数据
    List<Column>- 代表数据的列DataReader

  2. 修改数据并添加额外的列

  3. 创建一个AsDataReader(IEnumerable<object[]> updatedData,updatedColumns List<Column>)获取更新对象并返回*修改的方法DataReader

    • 我使用收益回报来提高性能。
于 2013-10-11T16:54:56.130 回答