我已经构建了一个从 EC2 运行的关键字搜索脚本,并将输出成功保存在 s3 上。但它是单线程的,这就是为什么它很慢。我想使用自定义 jar在 EMR 上运行它。有人可以将其转换为 Hadoop 脚本,以便我可以在 EMR 上运行它。
我是hadoop的新手。我尝试了以下回购但没有运气。
https://github.com/commoncrawl/cc-warc-examples
https://github.com/commoncrawl/example-warc-java
然后我混合这两个 repos 来制作以下脚本。
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
public class S3BucketReader3 {
static public void process(AmazonS3 s3, String bucketName, String prefix,
int max) throws S3ServiceException, AmazonServiceException,
AmazonClientException, InterruptedException {
int maxCount = 0;
int counter = 0;
int fileCounter = 1;
S3Service s3s = new RestS3Service(null);
// use a callback class for handling WARC record data:
IProcessWarcRecord processor = new SampleProcessWarcRecord();
ObjectListing list = s3.listObjects(bucketName, prefix);
do {
List<S3ObjectSummary> summaries = list.getObjectSummaries();
for (S3ObjectSummary summary : summaries) {
try {
// get single warc.gz file name
String key = summary.getKey();
System.out.println("+ key: " + counter + " " + key);
if (key.contains(".warc.gz") == false)
continue;
S3Object f = s3s.getObject("aws-publicdatasets", key, null,
null, null, null, null, null);
ArchiveReader ar = WARCReaderFactory.get(key,
f.getDataInputStream(), true);
for (ArchiveRecord r : ar) {
// The header file contains information such as the type
// of record, size, creation time, and URL
if (r.getHeader().getMimetype()
.contains("application/http; msgtype=response") == false)
continue;
// If we want to read the contents of the record, we can
// use the ArchiveRecord as an InputStream
// Create a byte array that is as long as all the
// record's stated length
byte[] rawData = IOUtils.toByteArray(r, r.available());
r.read(rawData);
// Note: potential optimization would be to have a large
// buffer only allocated once
// Why don't we convert it to a string and print the
// start of it? Let's hope it's text!
String content = new String(rawData);
if (content.contains("<!DOCTYPE html") == false)
continue;
// remove header
content = content.substring(content
.indexOf("<!DOCTYPE html")
+ "<!DOCTYPE html".length());
content = "<!DOCTYPE html" + content;
String lowerContent = content.toLowerCase();
// search keywords in HTML
if (lowerContent.contains("gambler")
|| lowerContent.contains("rich")
|| lowerContent.contains("name list")
|| lowerContent.contains("2nd rich generation")
|| lowerContent
.contains("2nd official generation")
|| lowerContent.contains("gambler addict")
|| lowerContent.contains("gamble")
|| lowerContent.contains("shanxi")
|| lowerContent.contains("macau")
|| lowerContent.contains("rich businessman")
|| lowerContent.contains("tour")
|| lowerContent.contains("smoking")) {
// write file directly to s3
byte[] contentBytes = null;
ByteArrayInputStream input = null;
try {
input = new ByteArrayInputStream(
content.getBytes());
contentBytes = IOUtils.toByteArray(input);
} catch (IOException e) {
System.err.printf(
"Failed while reading bytes from %s",
e.getMessage());
}
input.reset();
Long contentLength = Long
.valueOf(contentBytes.length);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(contentLength);
String dataFileName = "dataFile_" + fileCounter
+ "_" + System.currentTimeMillis() / 1000;
try {
s3.putObject(
"mybucket/common_crawl/warcOutput",
dataFileName, input, metadata);
} catch (AmazonServiceException ase) {
System.out.println("Error Message: "
+ ase.getMessage());
System.out.println("HTTP Status Code: "
+ ase.getStatusCode());
System.out.println("AWS Error Code: "
+ ase.getErrorCode());
System.out.println("Error Type: "
+ ase.getErrorType());
System.out.println("Request ID: "
+ ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("Error Message: "
+ ace.getMessage());
} finally {
if (input != null) {
input.close();
}
}
contentBytes = null;
input = null;
// Pretty printing to make the output more readable
System.out.println("Files created number: "
+ fileCounter);
fileCounter++;
} // if keyword match
lowerContent = null;
content = null;
} // for each ArchiveRecord
} catch (Exception ex) {
ex.printStackTrace();
}
counter++;
System.out.println("Count no: " + counter);
if (max != -1) {
if (++maxCount >= max)
return;
}
} // for each summary
list = s3.listNextBatchOfObjects(list);
} while (list.isTruncated());
// done processing all WARC records:
processor.done();
}
static public void main(String[] args) throws S3ServiceException,
AmazonServiceException, AmazonClientException, InterruptedException {
String awsAccessKey = "******";
String awsSecretKey = "******";
AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey,
awsSecretKey);
AmazonS3 s3 = new AmazonS3Client(credentials);
process(s3, "aws-publicdatasets",
"common-crawl/crawl-data/CC-MAIN-2013-48", -1);
}
}