您的位置 首页 java

解析-JAVA多线程网络爬虫如何用代码实现

因为项目需要,做了一个网络爬虫的小DEMO。

为实现高性能的网络爬虫,首先考虑采用APACE的HttpClient进行页面的采集和解析,HttpClient可以很方便的通过URL获得远程内容,例如一个小程序:

CloseableHttpClienthttp client = HttpClients.createDefault();

HttpGet httpget = new HttpGet(“#34;);

CloseableHttpResponse response = httpclient.execute(httpget);

try

{

HttpEntity entity =response.getEntity();

if (entity != null) {

long len =entity.getContentLength();

if (len != -1 && len <2048) {

System.out.println(EntityUtils.toString(entity));

} else {

// Stream contentout

}

}

}

finally {

response.close();

}

还可以做页面解析和模拟登陆等,功能相当强大。

其次,如果是网络爬虫或者网络采集,可能需要做大量的URL地址收集和分析,所以需要通过NoSQL数据库来提高执行的效率,Redis、Memcache、BerkeleyDB都是不错的选择。这里选择了BerkeleyDB数据库。虽然采用传统队列或其他形式可能性能会更高,但会带来大量的内存消耗,并不一定能找到符合条件的大内存服务器。

然后,对URL地址需要进行过滤,判断是否是已读的URL地址,如果已读就存入已读数据库,如果未读则放入未读数据库,有点类似队列的形式,以此避免重复读取URL地址。当然更进一步的需要判断页面内容是否重复,降低读取重复页面的概率。

再然后,对页面进行解析,提取关键内容和URL地址。

最后,为了保证性能,采用多线程的实现方式,在多服务器的模式下还可以采用分布式算法来实现更高的性能。

按照上面的思路,写了一个小程序:

1、部分配置信息,以CrawlConfig来做配置,也可以把这些存储为xml文件,这里采集的是163网站

(1)CrawlConfig.java

……

public class CrawlConfig {

public static final String CRAWL_PATH = “#34;;

public static final String CRAWL_LIMIT_PATH = “#34;;

public static final String CRAWL_VISITED_FRONTIER = “d:\\cache\\hevisited”;

public static final String CRAWL_UNVISITED_FRONTIER = “d:\\cache\\heunvisited”;

public static final String CRAWL_DOWNLOAD_PATH = “d:\\download\\163\\”;

public static final int CRAWL_THREAD_NUM = 6;

}

(2) CrawlUrl.java作为URL地址的对象,当然除了URL属性外还可以存储其他信息

……

public class CrawlUrl implements Serializable{

private static final long serialVersionUID = 79332323432323L;

public CrawlUrl() {

}

private String oriUrl; //原始url

private String url; //url地址

public String getOriUrl() {

return oriUrl;

}

public void setOriUrl(String oriUrl) {

this.oriUrl = oriUrl;

}

public String getUrl() {

return url;

}

public void setUrl(String url) {

this.url = url;

}

}

(3)LinkFilter.java ,作为URL地址的过滤器

public interface LinkFilter {

public boolean accept(String url);

}

2、编写访问BerkelyDB的代码(请先安装BerkeleyDB,并引入BerkeleyDB的Je包

(1)AbstractFrontier.java

……

public abstract class AbstractFrontier {

private Environment env;

private static String CLASS_CATALOG = “java_class_catalog”;

protected StoredClassCatalog javaCatalog;

protected Database catalogdatabase;

protected static Database database = null ;

protected String homeDirectory = null;

public AbstractFrontier(String homeDirectory) throws DatabaseException,

FileNotFoundException {

this.homeDirectory = homeDirectory;

System.out.println(“open environment: ” + homeDirectory);

//设置环境参数,打开env

EnvironmentConfig envConfig = new EnvironmentConfig();

envConfig.setTransactional(true);

envConfig.setAllowCreate(true);

env = new Environment(new File(homeDirectory), envConfig);

//设置数据库参数

DatabaseConfig dbConfig = new DatabaseConfig();

dbConfig.setTransactional(true);

dbConfig.setAllowCreate(true);

//打开数据库

catalogdatabase = env.openDatabase(null, CLASS_CATALOG, dbConfig);

javaCatalog = new StoredClassCatalog(catalogdatabase);

//设置参数

DatabaseConfig dbConfigTe = new DatabaseConfig();

dbConfigTe.setTransactional(true);

dbConfigTe.setAllowCreate(true);

//打开数据库

database = env.openDatabase(null, “URL”, dbConfig);

}

public void close() throws DatabaseException {

database.close();

javaCatalog.close();

env.close();

}

protected abstract void put(Object key, Object value);

protected abstract Object get(Object key);

protected abstract Object delete(Object key);

}

(2)Frontier.java

……

public interface Frontier {

public CrawlUrl getNext() throws Exception;

public boolean putUrl(CrawlUrl url) throws Exception;

}

(3)考虑到并发的BDBFrontier.java

……

public class BDBFrontier extends AbstractFrontier implements Frontier{

private StoredMap pendingUrisDB = null;

public static int threads = CrawlConfig.CRAWL_THREAD_NUM;

/**

* Creates a new instance of BDBFrontier.

*

* @param homeDirectory

* @throws DatabaseException

* @throws FileNotFoundException

*/

public BDBFrontier(String homeDirectory) throws DatabaseException,

FileNotFoundException {

super(homeDirectory);

EntryBinding keyBinding = new SerialBinding(javaCatalog, String.class);

EntryBinding valueBinding = new SerialBinding(javaCatalog, CrawlUrl.class);

pendingUrisDB = new StoredMap(database, keyBinding, valueBinding, true);

}

/**

*

* clearAll:

* 清除数据库

*

* @param 参数

* @return void 返回值

* @throws

*

*/

public void clearAll() {

if(!pendingUrisDB.isEmpty())

pendingUrisDB.clear();

}

/**

* 获得下一条记录

* @see com.fc.frontier.Frontier#getNext()

*/

@Override

public synchronized CrawlUrl getNext() throws Exception {

CrawlUrl result = null;

while(true) {

if(!pendingUrisDB.isEmpty()) {

Set entrys = pendingUrisDB.entrySet();

Entry<String, CrawlUrl> entry = (Entry<String,

CrawlUrl>) pendingUrisDB.entrySet().iterator().next();

result = entry.getValue(); //下一条记录

delete(entry.getKey()); //删除当前记录

System.out.println(“get:” + homeDirectory + entrys);

return result;

}

else {

threads –;

if(threads > 0) {

wait();

threads ++;

}

else {

notifyAll();

return null;

}

}

}

}

/**

* 存入url

* @see com.fc.frontier.Frontier#putUrl(com.fc.CrawlUrl)

*/

@Override

public synchronized boolean putUrl(CrawlUrl url) throws Exception {

if(url.getOriUrl() != null && !url.getOriUrl().equals(“”)

&& !pendingUrisDB.containsKey(url.getOriUrl()))

{

Set entrys = pendingUrisDB.entrySet();

put(url.getOriUrl(), url);

notifyAll();

System.out.println(“put:” + homeDirectory + entrys);

return true;

}

return false;

}

public boolean contains(Object key) {

if(pendingUrisDB.containsKey(key))

return true;

return false;

}

/**

* 存入数据库

* @see com.fc.frontier.AbstractFrontier#put(java.lang.Object, java.lang.Object)

*/

@Override

protected synchronized void put(Object key, Object value) {

pendingUrisDB.put(key, value);

}

/**

* 从数据库取出

* @see com.fc.frontier.AbstractFrontier#get(java.lang.Object)

*/

@Override

protected synchronized Object get(Object key) {

return pendingUrisDB.get(key);

}

/**

* 删除

* @see com.fc.frontier.AbstractFrontier#delete(java.lang.Object)

*/

@Override

protected synchronized Object delete(Object key) {

return pendingUrisDB.remove(key);

}

/**

*

* calculateUrl:

* 对Url进行计算,可以用压缩算法

*

* @param 参数

* @return String 返回值

* @throws

*

*/

private String calculateUrl(String url) {

return url;

}

public static void main(String[] strs) {

try {

BDBFrontier bdbFrontier = new BDBFrontier(“d:\\cache”);

CrawlUrl url = new CrawlUrl();

url.setOriUrl(“#34;);

bdbFrontier.putUrl(url);

System.out.println(((CrawlUrl)bdbFrontier.getNext()).getOriUrl());

bdbFrontier.close();

}catch(Exception e) {

e.printStackTrace();

}

}

}

3、核心部分:包括了URL获取、页面解析、页面下载,页面的解析和下载会比较消耗时间。

(1)RetievePage.java,实现了URL访问和页面的下载

……

public class RetrievePage {

private static String USER_AGENT = “Mozilla/4.0 (compatible; MSIE 6.0;

Windows NT 5.1; SV1; QQDownload 1.7; .NET CLR 1.1.4322; CIBA; .NET CLR

2.0.50727″;

private static String DEFAULT_CHARSET = “GB2312,utf-8;q=0.7,*;q=0.7”;

private static String ACCEPT = “text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8”;

/**

* 下载文件

* @param path

* @return

* @throws Exception

* @throws IOException

*/

public static boolean downloadPage(String path) throws Exception,IOException

{

CloseableHttpClient httpclient = HttpClients.createDefault();

HttpGet httpget = new HttpGet(path);

httpget.addHeader(“Accept-Charset”, DEFAULT_CHARSET);

// httpget.addHeader(“Host”, host);

httpget.addHeader(“Accept”, ACCEPT);

httpget.addHeader(“User-Agent”, USER_AGENT);

RequestConfig requestConfig = RequestConfig.custom() //设置超时

.setSocketTimeout(1000)

.setConnectTimeout(1000)

.build();

httpget.setConfig(requestConfig);

CloseableHttpResponse response = httpclient.execute(httpget);

try {

HttpEntity entity = response.getEntity();

StatusLine statusLine = response.getStatusLine();

if(statusLine.getStatusCode() == HttpStatus.SC_MOVED_PERMANENTLY || //如果是转移

statusLine.getStatusCode() == HttpStatus.SC_MOVED_TEMPORARILY ||

statusLine.getStatusCode() == HttpStatus.SC_SEE_OTHER ||

statusLine.getStatusCode() == HttpStatus.SC_TEMPORARY_REDIRECT)

{

Header header = httpget.getFirstHeader(“location”);

if(header != null){

String newUrl = header.getValue();

if(newUrl == null || newUrl.equals(“”))

{

newUrl = “/”;

HttpGet redirect = new HttpGet(newUrl);

}

}

}

if(statusLine.getStatusCode() == HttpStatus.SC_OK) { //成功访问

if (entity == null) {

throw new ClientProtocolException(“Response contains no content”);

}

else {

InputStream instream = entity.getContent();

String filename = getFilenameByUrl(path,entity.getContentType().getValue());

OutputStream outstream = new

FileOutputStream(CrawlConfig.CRAWL_DOWNLOAD_PATH +

filename); //存储到磁盘

try {

//System.out.println(convertStreamToString(instream));

int tempByte = -1;

while((tempByte = instream.read())>0)

{

outstream.write(tempByte);

}

return true;

}

catch(Exception e){

e.printStackTrace();

return false;

} finally {

if(instream != null)

{

instream.close();

}

if(outstream != null)

{

outstream.close();

}

}

}

}

return false;

}finally {

response.close();

}

}

public static String getFilenameByUrl(String url, String contentType) {

url = url.substring(7);

if(contentType.indexOf(“html”) != -1) {

url = url.replaceAll(“[\\?/:*|<>\”]”,”_”) + “.html”;

return url;

}

else {

url = url.replaceAll(“[\\?/:*|<>\”]”,”_”) + contentType.substring(contentType.lastIndexOf(‘/’) + 1);

return url;

}

}

/**

* 转换流数据为字符串

* @param is

* @return

*/

public static String convertStreamToString(InputStream is) {

BufferedReader reader = new BufferedReader(new InputStreamReader(is));

StringBuilder sb = new StringBuilder();

String line = null;

try {

while ((line = reader.readLine()) != null) {

sb.append(line + “/n”);

}

} catch (IOException e) {

e.printStackTrace();

} finally {

}

return sb.toString();

}

public static void main(String[] args)

{

try{

System.out.println(“下载开始”);

RetrievePage.downloadPage(“#34;);

System.out.println(“下载结束”);

}

catch(HttpException e){

e.printStackTrace();

}

catch(IOException e)

{

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

}

(2)HtmlParserTool.java,实现页面的解析,提取URL地址

……

public class HtmlParserTool {

public static Set<String> extractLinks(String url, LinkFilter filter){

Set<String> links = new HashSet<String>();

try {

Parser parser = new Parser(url);

parser.setEncoding(“gb2312”);

NodeFilter frameFilter = new NodeFilter() { //过滤节点

public boolean accept(Node node) {

if(node.getText().startsWith(“frame src=”))%20{%20

%20

%20return%20true;%20

%20

%20}%20

%20

%20else%20{%20

%20

%20return%20false;%20

%20

%20}%20

%20

%20}%20

%20

%20};%20

%20

%20

%20

%20OrFilter%20linkFilter%20=%20new%20OrFilter(new%20NodeClassFilter(LinkTag.class),%20frameFilter);%20

%20

%20NodeList%20list%20=%20parser.extractAllNodesThatMatch(linkFilter);%20//获取所有合适的节点%20

%20

%20for(int%20i%20=%200;%20i%20<list.size();i++)%20

%20

%20{%20

%20

%20Node%20tag%20=%20list.elementAt(i);%20

%20

%20if(tag%20instanceof%20LinkTag)%20{%20//链接文字%20

%20

%20LinkTag%20linkTag%20=%20(LinkTag)%20tag;%20

%20

%20String%20linkUrl%20=%20linkTag.getLink();//url%20

%20

%20String%20text%20=%20linkTag.getLinkText();//链接文字%20

%20

%20System.out.println(linkUrl%20+%20″**********” + text);

if(filter.accept(linkUrl))

links.add(linkUrl);

}

else if (tag instanceof ImageTag) //<img> 标签 //链接图片

{

ImageTag p_w_picpath = (ImageTag) list.elementAt(i);

System.out.print(p_w_picpath.getImageURL() + “********”);//图片地址

System.out.println(p_w_picpath.getText());//图片文字

if(filter.accept(p_w_picpath.getImageURL()))

links.add(p_w_picpath.getImageURL());

}

else//<frame> 标签

{

//提取 frame 里 src 属性的链接如 <frame src=”test.html”/>

String frame = tag.getText();

int start = frame.indexOf(“src=”);%20

%20

%20frame%20=%20frame.substring(start);%20

%20

%20int%20end%20=%20frame.indexOf(” “);

if (end == -1)

end = frame.indexOf(“>”);

frame = frame.substring(5, end – 1);

System.out.println(frame);

if(filter.accept(frame))

links.add(frame);

}

}

return links;

} catch (ParserException e) {

e.printStackTrace();

return null;

}

}

}

(3)MyCrawler.java实现页面的采集,这里采用了宽度优先的采集规则,当然更复杂的考虑这里还要设置深度,这里主要采用域名前缀作为过滤条件。另外多线程环境下,需要考虑数据的同步问题。

……

public class MyCrawler {

public static BDBFrontier visitedFrontier;

public static BDBFrontier unvisitedFrontier;

private static int num = 0;

public MyCrawler() {

try{

if(visitedFrontier == null){

visitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_VISITED_FRONTIER); //采用Nosql数据库存储访问地址方式

visitedFrontier.clearAll();

}

if(unvisitedFrontier == null) {

unvisitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_UNVISITED_FRONTIER);

unvisitedFrontier.clearAll();

}

}catch(Exception e) {

e.printStackTrace();

}

}

private void initCrawlerWithSeeds(String[] seeds) {

synchronized (this) {

try {

for(int i = 0;i<seeds.length;i++){

CrawlUrl url = new CrawlUrl(); //采用berkeleyDB形式

url.setOriUrl(seeds[i]);

unvisitedFrontier.putUrl(url);

}

} catch(Exception e) {

e.printStackTrace();

}

}

}

public void crawling(String[] seeds, int threadId) {

try {

LinkFilter filter = new LinkFilter() {

@Override

public boolean accept(String url) {

Pattern pattern = Pattern.compile(“^((https|http|ftp|rtsp|mms)?://)”

+ “+(([0-9a-z_!~*'().&=+$%-]+: )?[0-9a-z_!~*'().&=+$%-]+@)?”

+ “(([0-9]{1,3}\\.){3}[0-9]{1,3}”

+ “|”

+ “([0-9a-z_!~*'()-]+\\.)*”

+ “([0-9a-z][0-9a-z-]{0,61})?[0-9a-z]\\.”

+ “[a-z]{2,6})”

+ “(:[0-9]{1,4})?”

+ “((/?)|”

+ “(/[0-9a-z_!~*'().;?:@&=+$,%#-]+)+/?)$”);

Matcher matcher = pattern.matcher(url);

boolean isMatch= matcher.matches();

if(isMatch && url.startsWith(CrawlConfig.CRAWL_LIMIT_PATH)) {

return true;

}

else {

return false;

}

}

};

initCrawlerWithSeeds(seeds);

//采用berkeleyDB方式存储

CrawlUrl visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

//visitedFrontier.putUrl(visitedCrawlUrl);

do{

System.out.println(“线程:” + threadId);

if(visitedCrawlUrl == null) {

continue;

}

String visitedUrl = visitedCrawlUrl.getOriUrl();

if(visitedFrontier.contains(visitedUrl)) { //同步数据

visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

continue;

}

visitedFrontier.putUrl(visitedCrawlUrl);

if(null == visitedUrl || “”.equals(visitedUrl.trim())) { //抓取的地址为空

visitedFrontier.putUrl(visitedCrawlUrl);

visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

continue;

}

try{

RetrievePage.downloadPage(visitedUrl); //下载页面

Set<String> links = HtmlParserTool.extractLinks(visitedUrl, filter);

for(String link :links) {

if(!visitedFrontier.contains(link)

&&!unvisitedFrontier.contains(link) )

{

CrawlUrl unvisitedCrawlUrl = new CrawlUrl();

unvisitedCrawlUrl.setOriUrl(link);

unvisitedFrontier.putUrl(unvisitedCrawlUrl);

}

}

}catch(ConnectTimeoutException e) { //超时继续读下一个地址

visitedFrontier.putUrl(visitedCrawlUrl);

visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

num ++;

e.printStackTrace();

continue;

}catch(SocketTimeoutException e) {

visitedFrontier.putUrl(visitedCrawlUrl);

visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

num ++;

e.printStackTrace();

continue;

}

visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();

num ++;

}while(BDBFrontier.threads >0 && num < 1000);

}

catch (IOException e) {

e.printStackTrace();

}

catch(Exception e) {

e.printStackTrace();

}

}

}

(4)以Runnable接口形式实现d额多线程

……

public class MyCrawlerByThread extends MyCrawler implements Runnable{

private int threadId;

public MyCrawlerByThread(int id) {

this.threadId = id;

}

/**

* (non-Javadoc)

* @see java.lang.Runnable#run()

*/

@Override

public void run() {

try {

crawling(new String[]{CrawlConfig.CRAWL_PATH}, threadId);

}catch(Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

try {

long startTime=System.currentTimeMillis();

System.out.println(“采集开始”);

ArrayList<Thread> threadList = new ArrayList<Thread>(CrawlConfig.CRAWL_THREAD_NUM);

for(int i = 0 ; i < CrawlConfig.CRAWL_THREAD_NUM; i++) {

MyCrawlerByThread crawler = new MyCrawlerByThread(i);

Thread t = new Thread(crawler);

t.start();

threadList.add(t);

Thread.sleep(10L);

}

while(threadList.size() > 0) {

Thread child = (Thread) threadList.remove(0);

child.join();

}

System.out.println(“采集结束”);

long endTime=System.currentTimeMillis();

System.out.println(“程序运行时间: “+(endTime-startTime)+”ms”);

} catch(Exception e) {

e.printStackTrace();

}

}

}

执行结果:

采集开始

……

采集结束

程序运行时间: 25777ms

最后对采集性能进行分析,先后采用LinkQueue队列单线程、BerkeleyDB单线程、BerkeleyDB多线程方案进行网络采集,测试数据对比如下:

综上,多线程可以带来明显的性能提升。

©著作权归作者所有:转载来自51CTO博客作者清茶问道的原创作品。

此篇文章是我从51博客转载过来的,如果大家觉得这篇文章对自己有帮助的话,请评论转发一下。

文章来源:智云一二三科技

文章标题:解析-JAVA多线程网络爬虫如何用代码实现

文章地址:https://www.zhihuclub.com/186783.shtml

关于作者: 智云科技

热门文章

网站地图