6

大家都知道 Pig 支持DBStorage,但他们只支持从 Pig 到 mysql 这样的加载结果

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

但是请告诉我从mysql中读取表的方法

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

这是我的代码

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) {

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        }
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        // TODO Auto-generated method stub
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        boolean next = false;

        try {
            next = result.next();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try {
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (int i = 0; i < numColumns; i++) {

            try {
                Object field = result.getObject(i);

                switch (DataType.findType(field)) {
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                }

            } catch (Exception ee) {
                throw new RuntimeException(ee);
            }
        }

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException {

        con = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        try {
            if (user == null || pass == null) {
                con = DriverManager.getConnection(jdbcURL);
            } else {
                con = DriverManager.getConnection(jdbcURL, user, pass);
            }
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
        } catch (SQLException e) {
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        }
        count = 0;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    }

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

    }

}

我尝试了很多次写UDF但没有成功.....

4

1 回答 1

2

就像你说的,DBStorage只支持将结果保存到数据库。

要从 MySQL 加载数据,您可以查看一个名为sqoop的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,并且不能在 Pig 内部直接使用。

第三种选择是考虑编写 Pig LoadFunc(您说您尝试编写 UDF)。这应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您也可以使用一些结果集元数据检查来自动生成模式。

于 2012-06-08T10:39:59.467 回答