1

我正在使用亚马逊 PHP SDK V3.13.1,我必须在 30 秒内发送至少 10,000 条推送通知。现在我正在使用publishAsync它更快的方法,但我仍然没有及时发送。
所以我已经实现了套接字并每次发送一堆 3500 推送。以下是我发送套接字请求的控制器功能。

$parts = parse_url(base_url() . "welcome/send_signal");
for ($i = 1; $i <= 10000; $i++) {
    $device_type_ids_arr[$i]['token_id'] = "User token";
    $device_type_ids_arr[$i]['arn'] = "User arn";
    $device_type_ids_arr[$i]['member_id'] = $i;
    if ((count($device_type_ids_arr) == 3500) || $i == 10000) {
        $postData['devices'] = $device_type_ids_arr;
        $postData['pushSet'] = $pushSet;
        $postData['push_content'] = $push_content;
        $post_string = http_build_query($postData);
        $device_type_ids_arr = array();
        $fp = fsockopen($parts['host'], isset($parts['port']) ? $parts['port'] : 80, $errno, $errstr, 600);
        if (!$fp) {
            echo "Some thing Problem";
        }
        $out = "POST " . $parts['path'] . " HTTP/1.1\r\n";
        $out .= "Host: " . $parts['host'] . "\r\n";
        $out .= "User-Agent: " . $_SERVER['HTTP_USER_AGENT'] . "\r\n";
        $out .= "Content-Type: application/x-www-form-urlencoded\r\n";
        $out .= "Content-Length: " . strlen($post_string) . "\r\n";
        $out .= "Connection: Close\r\n\r\n";
        $out .= $post_string;
        fwrite($fp, $out);
        fclose($fp);
    }
}  

以下是我的函数,它接收套接字数据并发送推送通知。

$sns = new Aws\Sns\SnsClient(array(
    'version' => 'latest',
    'key' => "my sns key",
    'secret' => "secret",
    'region' => "region",
    'profile' => "amazon_user_profile",
    'debug' => false,
    'http' => array('verify' => false)
        ));
foreach ($device_id_arr as $device_detail) {
    try {
        $promises[] = $sns->publishAsync(array(
            'Message' => '{ "GCM": "{\"data\": { \"message\": \"Hello user\" } }"}',
            'MessageStructure' => 'json',
            'TargetArn' => "member sns arn"
        ));
    } catch (Exception $e) {
        $errorMsg = $e->getMessage();
    }
}
$results = \GuzzleHttp\Promise\settle($promises)->wait(TRUE);
$fp = fopen("test_parallel.txt", "a+");
fwrite($fp, "result:" . print_r($results, true) . "\r\n");
fclose($fp);  

当我发送 10 条通知时,这工作正常,但是当我发送 3500 次推送时,它不起作用并且没有给我任何回应。我也试过这个方法。带有 Guzzle 的 MultiCurl 的 Amazon AWS PHP 开发工具包?但它给了我错误Argument 1 passed to Aws\AwsClient::execute() must implement interface Aws\CommandInterface, array given

$sns = new Aws\Sns\SnsClient(array(
    'version' => 'latest',
    'key' => "my sns key",
    'secret' => "secret",
    'region' => "region",
    'profile' => "amazon_user_profile",
    'debug' => false,
    'http' => array('verify' => false)
        ));
foreach ($device_id_arr as $device_detail) {
    try {
        $publishCommands[] = $sns->getCommand('Publish', array(
            "Message" => '{ "GCM": "{\"data\": { \"message\": \"' . $push_content . '\", \"type\": \"' . PUSH_TYPE_SIGNAL . '\" } }"}',
            "MessageStructure" => "json",
            "TargetArn" => $device_detail['arn']
        ));
    } catch (Exception $e) {
        $errorMsg = $e->getMessage();
    }
}
try {
    $successfulCommands = $sns->execute($publishCommands);
    $failedCommands = array();
} catch (\Guzzle\Service\Exception\CommandTransferException $e) {
    $successfulCommands = $e->getSuccessfulCommands();
    $failedCommands = $e->getFailedCommands();
}

foreach ($failedCommands as $failedCommand) {
    $fp = fopen("test_parallel4.txt", "a+");
    fwrite($fp, "result:" . print_r($result, true) . "\r\n");
    fclose($fp);
}

$messageIds = array();
foreach ($successfulCommands as $successfulCommand) {
    $messageIds[] = $successfulCommand->getResult()->get('MessageId');
}

那么有人对此有解决方案吗?我主要关心的是在 30 秒内发送数千条推送通知。

4

1 回答 1

0

最后,我得到了使用multiple curl in php. 这是并行执行的,这也取决于您的服务器和互联网速度。以下是我的controller功能。

public function index() {
    for ($i = 1; $i <= 10000; $i++) {
        $device_type_ids_arr[$i]['member_id'] = $i;
        $device_type_ids_arr[$i]['arn'] = "member arn or empty";
        $device_type_ids_arr[$i]['token_id'] = "member token";
    }

    $start = microtime(true);
    /* Chunk members into 500 bunch and crete multiple curl request */
    $_datas = array_chunk($device_type_ids_arr, 500);
    $mh = curl_multi_init();
    $handles = array();
    foreach ($_datas as $batchPart) {
        $ch = curl_init();
        $postData['devices'] = $batchPart; //device array
        $postData['push_content'] = "Push Message";
        $data_string = json_encode($postData);
        curl_setopt($ch, CURLOPT_URL, base_url() . "Welcome/send_push_notification");
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
        curl_setopt($ch, CURLOPT_USERAGENT, 'User-Agent: curl/7.39.0');
        curl_setopt($ch, CURLOPT_POSTFIELDS, $data_string);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
        curl_setopt($ch, CURLOPT_FOLLOWLOCATION, false);
        curl_setopt($ch, CURLOPT_HTTPHEADER, array(
            'Content-Type: application/json',
            'Content-Length: ' . strlen($data_string))
        );
        curl_multi_add_handle($mh, $ch);
        $handles[] = $ch;
    }
    // execute requests and poll periodically until all have completed
    $isRunning = null;
    do {
        curl_multi_exec($mh, $isRunning);
        usleep(250000);
    } while ($isRunning > 0);

    curl_multi_close($mh);
    printf("Elapsed time: %.2f seconds\n", microtime(true) - $start);
}  

以下是并行发送推送通知的功能。我还解释了如何处理Exception并确保将通知发送给所有/最大用户。

 public function send_push_notification() {
        require_once APPPATH . 'libraries/aws-sdk/aws-autoloader.php';
        $pipeArr = json_decode(file_get_contents('php://input'), true);
        $push_content = $pipeArr["push_content"];
        $device_id_arr = $pipeArr["devices"];
        $sns = new Aws\Sns\SnsClient(array(
            'version' => 'latest',
            'key' => "Amazon account key",
            'secret' => "Amazon account secret",
            'region' => "Region",
            'profile' => "Amazon account user profile",
            'debug' => false,
            'http' => array('verify' => false)
        ));
        $appArn = "Application Arn";
        $promises = $results = $retArr = array();
        //start sending loop of 500 members asynchroniously
        foreach ($device_id_arr as $key => $device_detail) {
            $arn = $device_detail['arn'];
            $token = $device_detail['token_id'];
            $userid = $device_detail['member_id'];
            //If member arn is empty then register it on amazon and then update into databse
            if (empty($arn)) {
                try {
                    $updatedArn = $sns->createPlatformEndpoint(array('PlatformApplicationArn' => $appArn, 'Token' => $token));
                    $arn = isset($updatedArn['EndpointArn']) ? $updatedArn['EndpointArn'] : "";
                    //update member detail with new arn into database
                    ($arn != "") ?/* DB query goes here */ : "";
                } catch (Exception $e) {
                    $errorMsg = $e->getMessage();
                }
            }
            if (!empty($arn)) {
                try {
                    $promises[$userid] = $sns->publishAsync(array(
                        'Message' => '{ "GCM": "{\"data\": { \"message\": \"' . $push_content . '\"}}"}',
                        'MessageStructure' => 'json',
                        'TargetArn' => $arn
                    ));
                    $promises[$userid]->arn = $arn;
                    $promises[$userid]->token = $token;
                } catch (Exception $e) {
                    $errorMsg = $e->getMessage();
                }
            }
        }
        //This is Async method to send push and get result of each member with message Id
        $results = \GuzzleHttp\Promise\settle($promises)->wait(TRUE);

        /* Handle result and get message Id */
        foreach ($results as $key => $value) {
            /* If any of push notification is fail then check for error and fix it */
            if (isset($value['reason'])) {
                $message = $value['reason']->getMessage();
                $token = (isset($promises[$key]->token)) ? $promises[$key]->token : "";
                $arn = (isset($promises[$key]->arn)) ? $promises[$key]->arn : "";
                if (strpos($message, "Endpoint is disabled") !== false && $token != "" && $arn != "") {
                    try {
                        $res = $sns->setEndpointAttributes(array(
                            'Attributes' => array("Token" => $promises[$key]->token, "Enabled" => "true"),
                            'EndpointArn' => $arn
                        ));
                    } catch (Exception $e) {
                        $errorMsg = $e->getMessage();
                    }
                }
                if (strpos($message, "No endpoint found for the target arn specified") !== false && $token != "") {
                    try {
                        $updatedArn = $sns->createPlatformEndpoint(array('PlatformApplicationArn' => $appArn, 'Token' => $token));
                        $arn = isset($updatedArn['EndpointArn']) ? $updatedArn['EndpointArn'] : "";
                        //update member detail with new arn into database
                        ($arn != "") ?/* DB query goes here */ : "";
                    } catch (Exception $e) {
                        $errorMsg = $e->getMessage();
                    }
                }
                if (!empty($arn)) {
                    try {
                        $publishRes = $sns->publish(array(
                            'Message' => '{ "GCM": "{\"data\": { \"message\": \"' . $push_content . '\", \"type\": \"' . PUSH_TYPE_SIGNAL . '\" } }"}',
                            'MessageStructure' => 'json',
                            'TargetArn' => $arn
                        ));
                        $retArr[$key] = $publishRes->get("MessageId");
                    } catch (Exception $e) {
                        $errorMsg = $e->getMessage();
                        /* Final error of push notification not sent */
                        $newFile = fopen("error_push_not_sent.txt", "a+");
                        fwrite($newFile, "Member Id:" . $key . "\r\nARN:" . $arn . "\r\nToken:" . $token . "\r\n" . $errorMsg . "\r\n");
                        fclose($newFile);
                    }
                }
            } else {
                $retArr[$key] = $results[$key]['value']->get("MessageId");
            }
        }

        /* Get message Id of amazon and insert record into database */
        if (isset($retArr) && !empty($retArr)) {
            foreach ($retArr as $member_id => $message_id) {
                /* DB query goes here OR you can generate batch query here and then execute */
            }
        }
    }
于 2017-08-04T06:35:15.473 回答