0

我已经使用libpuzzle分析了 300 万张图像。200 万来自我的主服务器,100 万来自另一个服务器。我想将这些信息合并到 1 个 MySQL 数据库中。

我需要在test_images_pending数据库中获取记录并将它们插入,test_images但我必须以没有重复数据的方式进行。

test_images在所有表中总共有 1.15 亿条记录,单词本身就有 1.1 亿条记录。大小 ~4.4 GB

test_images_pending分别有6900万和6500万。大小 ~2.6 GB

我的计算机上有 8GB 内存,如果需要,我愿意将所有内容(或尝试)加载到内存中,以加快速度。

我希望对我的代码和/或技术进行一些优化以使 MySQL 更快,我可以将速率从每秒大约 2 张图片(来自 test_images_pending.picture 表)提高到更易于管理的东西。最起码是每秒 100 张图片。

test_images这是和的表设置test_images_pending

--
-- Table structure for table `errors`
--

CREATE TABLE IF NOT EXISTS `errors` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `url` varchar(255) NOT NULL,
  `num` int(11) NOT NULL,
  `pid` bigint(20) unsigned NOT NULL,
  `error` varchar(512) NOT NULL,
  `datetime` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=245688 ;

-- --------------------------------------------------------

--
-- Table structure for table `pictures`
--

CREATE TABLE IF NOT EXISTS `pictures` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `digest` char(32) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_digest` (`digest`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=1107725 ;

-- --------------------------------------------------------

--
-- Table structure for table `signatures`
--

CREATE TABLE IF NOT EXISTS `signatures` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `compressed_signature` varchar(338) NOT NULL,
  `picture_id` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `picture_id` (`picture_id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=1107725 ;

-- --------------------------------------------------------

--
-- Table structure for table `stored_pictures`
--

CREATE TABLE IF NOT EXISTS `stored_pictures` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `url` varchar(255) NOT NULL,
  `pid` bigint(20) unsigned NOT NULL,
  `num` int(11) NOT NULL,
  `updated_at` datetime DEFAULT NULL,
  `created_at` datetime DEFAULT NULL,
  `picture_id` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_url` (`url`),
  KEY `idx_picture_id` (`picture_id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=2773867 ;

-- --------------------------------------------------------

--
-- Table structure for table `words`
--

CREATE TABLE IF NOT EXISTS `words` (
  `pos_and_word` char(5) NOT NULL,
  `signature_id` int(11) NOT NULL,
  KEY `idx_pos_and_word` (`pos_and_word`),
  KEY `signature_id` (`signature_id`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
--

这是我正在运行的 php PDO 代码:

<html>
<head>
    <link href="../css/print.css" rel="stylesheet" type="text/css" media="print" /> <!-- siehe screen.css -->
    <link href="../css/screen.css" rel="stylesheet" type="text/css" media="screen, projection" /> 
    <!--[if lte IE 6]><link rel="stylesheet" href="../css/ielte6.css" type="text/css" media="screen" /><![endif]--> 
</head>
<body>
<?php
    ini_set('max_execution_time', 0);

    $dbh = new PDO("mysql:host=127.0.0.1;port=3306;dbname=test_images_pending;charset=utf-8", "root", "");
    $dbh->setAttribute( PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION );
    $dbh->setAttribute(PDO::ATTR_AUTOCOMMIT, FALSE);


    try {
        $query = "select id,digest from test_images_pending.pictures"; 
        $sth = $dbh->prepare($query);
        $sth->execute();

        while ($pending_pictures_rows = $sth->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {

            // Print out what id it's on.
            print $pending_pictures_rows['id']."<br>";
            buffer_flush();

            try {
                $dbh->beginTransaction(); 

                $query = "SELECT COUNT(id) from test_images.pictures WHERE digest = :digest";
                $sth1 = $dbh->prepare($query);
                $sth1->bindParam(':digest', $pending_pictures_rows['digest']);
                $sth1->execute();

                $count = $sth1->fetchColumn();

                if ($count == 1) {



                    $query = "SELECT id from test_images.pictures WHERE digest = :digest";
                    $sth2 = $dbh->prepare($query);
                    $sth2->bindParam(':digest', $pending_pictures_rows['digest']);
                    $sth2->execute();

                    $correct_pic_id = $sth2->fetchColumn();

                    if(!isset($correct_pic_id) or empty($correct_pic_id)) {
                        throw new PDOException('correct_pic_id was empty');
                    }

                    $query = "select * from test_images_pending.stored_pictures WHERE picture_id = :picture_id"; 
                    $sth3 = $dbh->prepare($query);
                    $sth3->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth3->execute();

                    while ($row = $sth3->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {

                        $query = "INSERT INTO test_images.stored_pictures 
                                    (id, url, pid, num, updated_at, created_at, picture_id) 
                                  VALUES 
                                    (default, :url, :pid, :num, :updated_at, :created_at, :picture_id);";

                        $sth4 = $dbh->prepare($query);
                        $sth4->bindParam(':url', $row['url']);
                        $sth4->bindParam(':pid', $row['pid']);
                        $sth4->bindParam(':num', $row['num']);
                        $sth4->bindParam(':updated_at', $row['updated_at']);
                        $sth4->bindParam(':created_at', $row['created_at']);
                        $sth4->bindParam(':picture_id', $correct_pic_id);
                        $sth4->execute();
                    }

                    $query = "DELETE FROM test_images_pending.stored_pictures WHERE picture_id = :picture_id;";
                    $sth5 = $dbh->prepare($query);
                    $sth5->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth5->execute();

                    $query = "select id from test_images_pending.signatures WHERE picture_id = :picture_id;"; 
                    $sth6 = $dbh->prepare($query);
                    $sth6->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth6->execute();

                    $signature_id = $sth6->fetchColumn();

                    if(!isset($signature_id) or empty($signature_id)) {
                        throw new PDOException('signature_id was empty');
                    }

                    $query = "DELETE FROM test_images_pending.words WHERE signature_id = :signature_id;"; 
                    $sth7 = $dbh->prepare($query);
                    $sth7->bindParam(':signature_id', $signature_id);
                    $sth7->execute();

                    $query = "DELETE FROM test_images_pending.signatures WHERE picture_id = :picture_id";        
                    $sth8 = $dbh->prepare($query);
                    $sth8->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth8->execute();

                    $query = "DELETE FROM test_images_pending.pictures WHERE digest = :digest";                                  
                    $sth9 = $dbh->prepare($query);
                    $sth9->bindParam(':digest', $pending_pictures_rows['digest']);
                    $sth9->execute();
                } else if ($count == 0){



                    $query = "INSERT INTO test_images.pictures
                                (id, digest) 
                              VALUES 
                                (default, :digest);";

                    $sth2 = $dbh->prepare($query);
                    $sth2->bindParam(':digest', $pending_pictures_rows['digest']);
                    $sth2->execute();

                    $new_pic_id = $dbh->lastInsertId();


                    $query = "select * from test_images_pending.stored_pictures WHERE picture_id = :picture_id"; 
                    $sth3 = $dbh->prepare($query);
                    $sth3->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth3->execute();

                    while ($row = $sth3->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {

                        $query = "INSERT INTO test_images.stored_pictures 
                                    (id, url, pid, num, updated_at, created_at, picture_id) 
                                  VALUES 
                                    (default, :url, :pid, :num, :updated_at, :created_at, :picture_id);";

                        $sth4 = $dbh->prepare($query);
                        $sth4->bindParam(':url', $row['url']);
                        $sth4->bindParam(':pid', $row['pid']);
                        $sth4->bindParam(':num', $row['num']);
                        $sth4->bindParam(':updated_at', $row['updated_at']);
                        $sth4->bindParam(':created_at', $row['created_at']);
                        $sth4->bindParam(':picture_id', $new_pic_id);
                        $sth4->execute();
                    }




                    $query = "DELETE FROM test_images_pending.stored_pictures WHERE picture_id = :picture_id;";
                    $sth5 = $dbh->prepare($query);
                    $sth5->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth5->execute();

                    $query = "select id,compressed_signature from test_images_pending.signatures WHERE picture_id = :picture_id;"; 
                    $sth6 = $dbh->prepare($query);
                    $sth6->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth6->execute();
                    $fetched = $sth6->fetch(PDO::FETCH_ASSOC);

                    $signature_id = $fetched['id'];

                    if(!isset($signature_id) or empty($signature_id)) {
                        print_r($sth6->fetch(PDO::FETCH_ASSOC));
                        throw new PDOException('signature_id was empty');
                    }

                    $compressed_signature = $fetched['compressed_signature'];

                    if(!isset($compressed_signature) or empty($compressed_signature)) {
                        print_r($sth6->fetch(PDO::FETCH_ASSOC));
                        throw new PDOException('compressed_signature was empty');
                    }

                    $query = "INSERT INTO test_images.signatures
                                (id, compressed_signature, picture_id)
                              VALUES
                                (default, :compressed_signature, :picture_id);";

                    $sth7 = $dbh->prepare($query);
                    $sth7->bindParam(':picture_id', $new_pic_id);
                    $sth7->bindParam(':compressed_signature', $compressed_signature);
                    $sth7->execute();

                    $new_sig_id = $dbh->lastInsertId();

                    $query = "SELECT pos_and_word FROM test_images_pending.words WHERE signature_id = :signature_id";  
                    $sth8 = $dbh->prepare($query);
                    $sth8->bindParam(':signature_id', $signature_id);
                    $sth8->execute();

                    while ($row = $sth8->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {

                        $query = "INSERT INTO test_images.words 
                                    (pos_and_word, signature_id)
                                  VALUES 
                                    (:pos_and_word, :signature_id);";

                        $sth9 = $dbh->prepare($query);
                        $sth9->bindParam(':pos_and_word', $row['pos_and_word']);
                        $sth9->bindParam(':signature_id', $new_sig_id);
                        $sth9->execute();
                    }

                    $query = "DELETE FROM test_images_pending.words WHERE signature_id = :signature_id;"; 
                    $sth10 = $dbh->prepare($query);
                    $sth10->bindParam(':signature_id', $signature_id);
                    $sth10->execute();

                    $query = "DELETE FROM test_images_pending.signatures WHERE picture_id = :picture_id";        
                    $sth11 = $dbh->prepare($query);
                    $sth11->bindParam(':picture_id', $pending_pictures_rows['id']);
                    $sth11->execute();

                    $query = "DELETE FROM test_images_pending.pictures WHERE digest = :digest";                                  
                    $sth12 = $dbh->prepare($query);
                    $sth12->bindParam(':digest', $pending_pictures_rows['digest']);
                    $sth12->execute();


                } else {
                    throw new PDOException("Found more than 1 match for the digest '{$pending_pictures_rows['digest']}' in 'test_images.pictures' ", $query);
                }

                $dbh->commit(); 
            } catch (PDOException $e) {
                $dbh->rollback(); 
                print "<pre>"; print_r($e); print "</pre>"; exit;
            }
        }

        try {

            $dbh->beginTransaction();

            $query = "SELECT * FROM test_images_pending.errors";
            $sth13 = $dbh->prepare($query);
            $sth13->execute();

            while ($row = $sth13->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {

                $query = "INSERT INTO test_images.errors 
                            (id, url, num, pid, error, datetime)
                          VALUES 
                            (default, :url, :num, :pid, :error, :datetime);";


                $sth14 = $dbh->prepare($query);
                $sth14->bindParam(':url', $row['url']);
                $sth14->bindParam(':num', $row['num']);
                $sth14->bindParam(':pid', $row['pid']);
                $sth14->bindParam(':error', $row['error']);
                $sth14->bindParam(':datetime', $row['datetime']);
                $sth14->execute();
            }

            $query = "DELETE FROM test_images_pending.errors WHERE 1";       
            $sth15 = $dbh->prepare($query);
            $sth15->execute();

            $dbh->commit(); 
        } catch (PDOException $e) {
            $dbh->rollback(); 
            print "<pre>"; print_r($e); print "</pre>"; exit;
        }
    } catch (PDOException $e) {
        print "<pre>"; print_r($e); print "</pre>"; exit;
    }


function buffer_flush(){

    echo str_pad('', 512);
    echo '<!-- -->';

    if(ob_get_length()){

        @ob_flush();
        @flush();
        @ob_end_flush();

    }

    @ob_start();

}
?> 
</body>
</html>

编辑:

一些分析:

这个 INSERT 每张非相似图片运行 100 次(到目前为止,每 6 次大约 5 次)。完成 while 循环通常需要 0.5 到 0.9 秒,每个 INSERT 平均需要 0.007 秒。

$query = "INSERT INTO test_images.words 
        (pos_and_word, signature_id)
        VALUES 
        (:pos_and_word, :signature_id);";

$sth9 = $dbh->prepare($query);
$sth9->bindParam(':pos_and_word', $row['pos_and_word']);
$sth9->bindParam(':signature_id', $new_sig_id);
$sth9->execute();
DELETE FROM test_images_pending.stored_pictures WHERE picture_id = :picture_id;

select * from test_images_pending.stored_pictures WHERE picture_id = :picture_id

DELETE FROM test_images_pending.stored_pictures WHERE picture_id = :picture_id;

每张相似的图片平均需要 0.15 秒左右(约 1 分/6 分)。

编辑2:

通过这个基准测试:http ://we-love-php.blogspot.com/2012/08/mass-inserts-updates-sqlite-vs-mysql.html

只需简单地写入文本文件即可替换之前在编辑 1 中提到的慢速 while 循环,例如:

$inserts = array();
while ($row = $sth8->fetch(PDO::FETCH_ASSOC, PDO::FETCH_ORI_NEXT)) {
    $inserts[] = "(".$dbh->quote($row['pos_and_word']).", ".$dbh->quote($new_sig_id).")";
}
$query = "INSERT INTO imvu_images.words (pos_and_word, signature_id) VALUES " . implode(',',$inserts) . ";";            
file_put_contents("inserts.sql", $query."\n", FILE_APPEND);

让它更快。不过不是每秒 100 个,更像是 10-20 个。然后我可以稍后执行 SQL,它会立即运行而不会延迟。(这就是为什么我认为我的代码有问题)。我想要每秒 100 个的原因是因为我可以分析图像并将它们以每秒 30 个的速度插入到 1 个数据库中。以这种速度,我分析 200 万张图像并让它一张一张地插入比批量插入行要快。这似乎不对,服务器可以下载 30 张图像,分析 30 张图像,然后在 1 秒内执行 30 次插入,但仅执行这些各种 SQL 语句甚至无法匹配。

编辑3:

将 my.ini 更新为:

key_buffer_size=4000M
read_buffer_size=32M
read_rnd_buffer_size=200M
bulk_insert_buffer_size=1000M
myisam_max_sort_file_size=10000M
myisam_repair_threads=1
tmp_table_size = 1024M
max_heap_table_size = 1024M
join_buffer_size=8M
sort_buffer_size=8M
max_allowed_packet=32M
max_connect_errors=10
myisam_sort_buffer_size=256M
query_cache_limit=12M
query_cache_size=256M
query_cache_type=1

在不使用 file_put_contents hack 的情况下,这似乎将性能提高了 2 倍。尽管如此,每秒 5 条记录并没有减少它。

4

3 回答 3

1

这个过程如此缓慢的原因不是因为单个查询很慢 - 事实上,我对它的速度有多快感到惊讶 - 而是因为您正在处理数百万条记录,一次一条,通过循环遍历每条记录在您的外部结果集中。SQL 擅长的是一次性处理数百万条记录。

您的代码中有太多业务逻辑,我不想为您重写整个代码,但我认为您希望按照以下方式重写代码

INSERT INTO test_images.pictures
      (id, digest) 
SELECT id, digest
from  test_images_pending.pictures
where id not in 
   (select id from test_images.pictures)

对其他表执行相同操作。这应该运行得非常快——如果你有一个好的索引方案,你几乎肯定会受到 I/O 限制。您绝对应该达到每秒超​​过 2 条记录!

于 2013-01-11T14:52:09.670 回答
0

编辑:主要问题是正在插入的源表上的索引。建议在进行批量插入之前删除任何不需要的索引,然后重建后缀。

通过调整 mysql 设置和以下代码的组合,我能够在 30 秒左右的时间内获得重复图像(连接部分)以执行 50,000 次,25 秒只是 JOIN 操作。

第二部分我使用的是 NOT IN,这是大部分时间发生的地方,但它以每秒 800 条记录的速度插入,所以它超出了我的目标。

我将把这个问题留得更久,看看它是否可以进一步优化,因为我有 3900 万条记录要处理。

<html>
<head>
      <link href="../css/print.css" rel="stylesheet" type="text/css" media="print" /> <!-- siehe screen.css -->
    <link href="../css/screen.css" rel="stylesheet" type="text/css" media="screen, projection" /> 
    <!--[if lte IE 6]><link rel="stylesheet" href="../css/ielte6.css" type="text/css" media="screen" /><![endif]--> 
</head>
<body>
 <?php
    ini_set('max_execution_time', 0);
    $benchmark = false;
    $delete = false;
    $dbh = new PDO("mysql:host=127.0.0.1;port=3306;dbname=test_images_pending;charset=utf-8", "root", "");
    $dbh->setAttribute( PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION );
    $dbh->setAttribute(PDO::ATTR_AUTOCOMMIT, FALSE);

    $timers = array();

    try {
        $query = "SELECT * FROM test_images.pictures
                INNER JOIN test_images_pending.pictures
                USING ( digest )";

        $sth = $dbh->prepare($query);
        $sth->execute();

        while ($join_rows = $sth->fetch(PDO::FETCH_NUM, PDO::FETCH_ORI_NEXT)) {

            $digest =  $join_rows[0];
            $correct_pic_id = $join_rows[1];
            $wrong_pic_id = $join_rows[2];


            try {
                $dbh->beginTransaction(); 



                $query = "INSERT INTO test_images.stored_pictures 
                                (url, pid, num, updated_at, created_at, picture_id) 
                          SELECT 
                                url, pid, num, updated_at, created_at, :correct_pic_id FROM test_images_pending.stored_pictures WHERE picture_id = :wrong_pic_id;";

                $sth4 = $dbh->prepare($query);
                $sth4->bindParam(':correct_pic_id', $correct_pic_id);
                $sth4->bindParam(':wrong_pic_id', $wrong_pic_id);
                $sth4->execute();


                $dbh->commit(); 
            } catch (PDOException $e) {
                $dbh->rollback(); 
                print "<pre>"; print_r($e); print "</pre>"; exit;
            }
        }

    } catch (PDOException $e) {
        print "<pre>"; print_r($e); print "</pre>"; exit;
    }





    try {


        $query = "SELECT COUNT(id) FROM  `signatures` WHERE (`id` -  `picture_id` !=0)  ";
        $sth = $dbh->prepare($query);
        $sth->execute();

        $count = $sth->fetchColumn();
        if($count > 0) {
            die("we got a sig that aint matching its pic_id, we cant assume sig_id = pic_id. Back to drawing board");
        }
        $sth = null;


        $query = "  SELECT  digest, id
                    FROM    test_images_pending.pictures
                    WHERE   digest NOT IN
                        (
                        SELECT  digest
                        FROM    test_images.pictures
                        )"; 
        $sth = $dbh->prepare($query);
        $sth->execute();

        while ($not_in_rows = $sth->fetch(PDO::FETCH_NUM, PDO::FETCH_ORI_NEXT)) {

            $digest =  $not_in_rows[0];
            $wrong_pic_id = $not_in_rows[1];


            try {
                $dbh->beginTransaction(); 

                $query = "INSERT INTO test_images.pictures
                            (id, digest) 
                          VALUES 
                            (default, :digest);";

                $sth2 = $dbh->prepare($query);
                $sth2->bindParam(':digest', $digest);
                $sth2->execute();

                $new_pic_id = $dbh->lastInsertId();



                $query = "INSERT INTO test_images.stored_pictures 
                                (url, pid, num, updated_at, created_at, picture_id) 
                          SELECT 
                                url, pid, num, updated_at, created_at, :new_pic_id FROM test_images_pending.stored_pictures WHERE picture_id = :wrong_pic_id;";

                $sth3 = $dbh->prepare($query);
                $sth3->bindParam(':new_pic_id', $new_pic_id);
                $sth3->bindParam(':wrong_pic_id', $wrong_pic_id);
                $sth3->execute();



                $query = "INSERT INTO test_images.signatures 
                                (compressed_signature, picture_id) 
                          SELECT 
                                compressed_signature, :new_pic_id FROM test_images_pending.signatures WHERE picture_id = :wrong_pic_id;";

                $sth4 = $dbh->prepare($query);
                $sth4->bindParam(':new_pic_id', $new_pic_id);
                $sth4->bindParam(':wrong_pic_id', $wrong_pic_id);
                $sth4->execute();
                $new_sig_id = $dbh->lastInsertId();


                $query = "INSERT INTO test_images.words 
                            (pos_and_word, signature_id)
                          SELECT 
                            pos_and_word, :new_sig_id FROM test_images_pending.words WHERE signature_id = :old_sig_id

                            ";

                $sth9 = $dbh->prepare($query);
                $sth9->bindParam(':old_sig_id', $wrong_pic_id);
                $sth9->bindParam(':new_sig_id', $new_sig_id);
                $sth9->execute();



                $dbh->commit(); 
            } catch (PDOException $e) {
                $dbh->rollback(); 
                print "<pre>"; print_r($e); print "</pre>"; exit;
            }
        }
    } catch (PDOException $e) {
        print "<pre>"; print_r($e); print "</pre>"; exit;
    }

function buffer_flush(){

    echo str_pad('', 512);
    echo '<!-- -->';

    if(ob_get_length()){

        @ob_flush();
        @flush();
        @ob_end_flush();

    }

    @ob_start();

}
 ?> 
</body>
</html>
于 2013-01-12T06:01:40.697 回答
0

为什么不能使用 Mysql 存储过程?它们直接在 Mysql 服务器中执行,比从 php 执行查询要快。 http://dev.mysql.com/doc/refman/5.0/en/create-procedure.html

像这样从 php 调用存储过程:

$res = mysql_query('call sp_sel_test()');
if ($res === FALSE) {
    die(mysql_error());
}

您需要在连接时设置客户端标志,以便在 php 中使用存储过程。使用这个:mysql_connect($this->h,$this->u,$this->p,false,65536);

有关更多详细信息,请参阅MySQL 客户端标志

于 2013-01-11T14:34:09.717 回答