目录

EasySpider爬虫架构设计练气版

一、前言:

  • 说到数据收集,大家就会想到爬虫,而说到爬虫,目前市场上开源的爬虫很多且非常强大,从单机的到分布式的层出不穷、争奇斗艳。
  • 但是对于学习来说,我还是觉得有必要自己尝试着设计一个爬虫架构,可能你开始没什么想法,此时你可以参考一些优秀的如python爬虫scrapy,java爬虫WebMagic,当然WebMagic的设计思想也部分来自于scrapy。
  • 本次,我说介绍的架构可能很漏,但是确是通过思考后自己设想并付诸实践整理出来的,其中也参考了webmagic的模块化思想。

二、模块

结构模块

  • 其主要分为五个模块
  • 1、spider:负责与用户交互的模块,配置启动url,代理、数据解析等功能
  • 2、scheduler:负责请求链接的管理调度,目前只是实现了基本的存储获取,后续会扩展为支持各种调度策略的一个模块
  • 3、download:下载模块,负责资源的下载,数据的缓存等。
  • 4、pipeline:数据通道,实现很简单,对抽取出来的数据做存储或者分析之用。
  • 5、engine:引擎,负责调度四个类型的模块进行运转。

三、代码详解

  • 创建四个Base类来分别定义spider、scheduler、download、pipeline的方法字段标准

  • BaseSpider

      from lxml.html import etree;
      from com.anjie.e_request import ERequest;
    
      class BaseSpider:
      	def __init__(self):
      		super(BaseSpider, self).__init__();
      		self.seed_url = None;
      		self.pipeline_item = None;
      		self.requests = [];
    
      	def addRequest_urls(self, urls):
      		if urls:
      			for r in map(lambda url:ERequest(url), urls):
      				self.requests.append(r);
    
      	def pagerProcess(self, page):
      		pass;
    
      	def pagerBack(self, html):
      		page = root = etree.HTML(html);
      		self.pagerProcess(page);
      		item = self.pipeline_item;
      		requests = None;
      		if len(self.requests) > 0:
      			requests = self.requests.copy();
    
      		return (self.pipeline_item, requests)
    
      	# 返回种子请求
        def getSeeds(self):
      		result = [];
      		for url in self.seed_url:
      			r = ERequest(url=url);
      			result.append(r);
      		return result;
    
  • BaseScheduler

      class BaseScheduler:
      	def __init__(self):
      		super(BaseScheduler, self).__init__();
    
      	def __iter__(self):
      		return self
    
        def __next__(self):
      		pass;
    
      	def nextRequest(self):
      		pass
    
       def addRequest(self, rq):
      		pass
    
       def addRequests(self, rqs):
      		pass;
    
      	def addLoserRequest(self, rq):
      		pass;
    
  • BaseDownload

      # Created by zaizai at 2017/9/22
    
      class BaseDownload:
      	def __init__(self):
      		super(BaseDownload, self).__init__();
    
      	def excuteRequest(self, rq):
      		pass;
    
  • BasePipeline

      class BasePipeline:
      	def __init__(self):
      		super(BasePipeline,self).__init__();
    
      	def piplineData(self, data):
      		pass
    
  • OK,如果后续需要实现不同的爬虫、下载器、调度器、数据管道则通过继承上面的base类,起到规范的作用。

  • OK,我们来看下目前的目录结构:

//ow3d01r1a.bkt.clouddn.com//file/2017/9/07116946935c4f9ebfd94dedf9109c62-image.png

  • base:存储base类

  • mode:存储构建过程中定义的一些类型结构

  • module:存储默认的几个类,如默认的下载器、默认的调度器、默认的管道

  • spider:存放爬虫代码

  • utils:存储一个辅助工具类

  • 最外围为我们的engine类。

  • 我们先看下引擎类的代码,目前代码还是比较简单的。

      # Created by zaizai at 2017/9/21
    
      from com.anjie.module.default_download import DefaultDownload;
      from com.anjie.module.default_scheduler import DefaultScheduler;
      from com.anjie.utils.elog import Elog;
      from com.anjie.spider.myspider import Spider
    
      class Engine:
    
      	def __init__(self,spider = None,scheduler = DefaultScheduler(),download =DefaultDownload(),pipline=None ):
      		super(Engine, self).__init__();
      		self.spider = spider;
      		self.scheduler = scheduler;
      		self.download = download();
      		self.pipline = pipline;
    
      	def addSpider(self, spider):
      		self.spider = spider;
    
      	def start(self):
      		self.scheduler.addRequests(self.spider.getSeeds())
      		while True:
      			rq = self.scheduler.nextRequest();
      			if rq is None:
      				Elog.warning('Engine is will stop,because scheduler has not more request be schedule');
      				break;
      			resultPage = self.download.excuteRequest(rq);
      			if resultPage is not None:
      				(pipelineItems, nextRequests) = self.spider.pagerBack(resultPage);
      				if pipelineItems and self.pipline:
      					self.pipline.piplineData(pipelineItems);
      				if nextRequests:
      					self.scheduler.addRequests(nextRequests);
      			else:
      				# 判断是否需要加入请求重新请求队列
        pass
    
       def sleep(self, time):
      		pass;
    
      	def stop(self):
      		pass
    
      if __name__ == '__main__':
      	e = Engine();
      	e.addSpider(Spider());
      	e.start()
    
  • 里面先从spider中获取seedUrl传入scheduler,然后就进行while循环从scheduler中不断取出url。代码很简单。

  • 我们再看下几个默认的模块

  • 默认的下载器

      import random;
      from urllib import request, error
    
      from com.anjie.base.download import BaseDownload;
      from com.anjie.utils.elog import Elog
      from com.anjie.utils.throttle import Throttle;
    
      # 默认的延迟事件
      DEFAULT_DELAY = 5
      # 默认的重试次数
      DEFAULT_RETRIES = 1
      # 默认的超时事件
      DEFAULT_TIMEOUT = 60
    
      class DefaultDownload(BaseDownload):
      	def __init__(self, num_retries=DEFAULT_RETRIES, cache=None, proxies=None,
      				 delay=DEFAULT_DELAY):
      		super(DefaultDownload, self).__init__();
      		# 重试次数
        self.num_retries = num_retries;
      		# 代理
       # 缓存  self.cache = cache;
      		self.proxies = proxies;
      		self.throttle = Throttle(delay)
    
      	def excuteRequest(self, rq):
      		return self.download(rq)
    
      	def download(self, rq):
      		print('download url is %s' % rq.url);
      		result = None;
      		if self.cache is not None:
      			try:
      				result = self.cache[rq.url];
      			except KeyError as e:
      				Elog.warning('url %s is available in cache' % rq.url)
      				pass;
      			else:
      				# 没有异常时执行这里
        if result is not None and self.num_retries > 0 and 500 <= result['code'] < 600:
      					# 上次请求时没有拿到数据
        Elog.info("server error so ignore result from cache of url %s and re-download" % rq.url);
      					result = None;
    
      		# 没有配置cache获取cache未缓存该url数据则走这里
        if result is not None:
      			return result['html'];
    
      		if result is None:
      			Elog.info("url %s is haven't cache, so still need to download");
      			# proxy = random.choice(self.proxies) if self.proxies else None
        proxy = None;
      			if self.proxies is not None:
      				proxy = random.choice(self.proxies);
      			result = self.realDownload(rq, proxy=proxy,
      									   num_retries=self.num_retries);
      		if self.cache is not None:
      			# save data to cache
        self.cache[rq.url] = result;
      		return result['html'];
    
      	def realDownload(self, rq, proxy, num_retries, data=None):
    
      		# 进行延迟请求
        self.throttle.wait(rq.url)
    
      		html = None;
      		code = None;
      		try:
      			rq = request.Request(url=rq.url, headers=rq.headers);
      			rp = request.urlopen(rq);
      			Elog.info('download over url is: %s' % rp.geturl())
      			html = rp.read();
      			code = rp.code;
      		except error.URLError as e:
      			Elog.error('download  error :%s' % e.reason);
      			html = '';
      			if self.num_retries > 0:
      				if hasattr(e, 'code') and 500 <= e.code < 600:
      					return self.realDownload(rq, proxy, num_retries - 1);
      				pass;
      		return {'html': html, 'code': code};
    
      if __name__ == '__main__':
      	d = DefaultDownload();
      	print(d.download('http://www.baidu.com'))
    
  • 默认的调度器:

      from urllib import parse;
      from urllib.robotparser import RobotFileParser;
    
      from com.anjie.base.scheduler import BaseScheduler
      from com.anjie.mode.spider_exception import SpiderException;
      from com.anjie.utils.elog import Elog
    
      class DefaultScheduler(BaseScheduler):
      	def __init__(self):
      		super(DefaultScheduler, self).__init__();
    
      		# 待抓取队列
        self.belle_queue = list();
    
      		# 下载失败队列
        self.loser_queue = [];
    
      		# robots数据缓存
        self.rp_cache_queue = [];
    
      		# robots禁止队列
        self.robots_loser_queue = []
    
      		# 已爬取url
        self.crawl_over_queue = [];
    
      		self.rp_cache = dict();
    
      	def __iter__(self):
      		return self
    
        def __next__(self):
      		r = self.belle_queue.pop();
      		if r is None:
      			raise StopIteration
        else:
      			return r;
    
      	def nextRequest(self):
      		r = self.belle_queue.pop();
      		if r is None:
      			return None;
      		else:
      			return r;
    
      	def addRequest(self, rq):
      		pass;
    
      	def addRequests(self, rqs):
      		self.belle_queue.extend(rqs);
    
      	def addLoserRequest(self, rq):
      		self.belle_queue.extend(rq);
    
      	def start_craw(self):
      		if not self.spider:
      			raise SpiderException("spider obeject is None")
    
      		if not hasattr(self.spider, 'start_url'):
      			raise SpiderException("spider must have an start_url attribute")
    
      		if not hasattr(self.spider, 'pager_back'):
      			raise SpiderException("spider must have an pager_back method")
    
      		self.crawl_queue.extend(self.spider.start_url);
      		# 初始化每个url请求的次数为0
    
        while self.crawl_queue:
      			url = self.crawl_queue.pop();
    
      			html = None;
      			# 咱要做一只优雅的爬虫
        rp = self.get_robots(url);
    
      			if rp is not None:
      				if rp.can_fetch(useragent=self.download.user_agent, url=url):
      					html = self.download.download(url=url);
      				else:
      					Elog.warning(
      						'current url : %s and user_agent: %s is be disallow for robots.txt' % (url, self.user_agent))
      					html = None;
    
      			if html:
      				self.crawl_over_queue.append(url);
    
      			pipeline_item = self.spider.pager_back(url, html);
      			# 判断是否有增加请求url
        print(pipeline_item)
      			if self.pipline and pipeline_item and pipeline_item['item']:
      				self.pipline.piplineData(pipeline_item['item']);
    
      			if pipeline_item['url']:
      				for r in pipeline_item['url']:
      					if not r in self.crawl_over_queue and r not in self.crawl_queue:
      						self.crawl_queue.append(r);
    
      						# 解析robots.txt文件
    
        def get_robots(self, url):
    
      		"""Initialize robots parser for this domain
       """  (proto, rest) = parse.splittype(url)
      		# 获取域名res
        res, rest = parse.splithost(rest)
      		rp = None;
      		try:
      			rp = self.rp_cache[res];
      		except KeyError as e:
      			Elog.error('key error');
      			pass;
      		else:
      			return rp;
    
      		rp = RobotFileParser()
      		rp.set_url(parse.urljoin(url, '/robots.txt'))
      		rp.read()
    
      		if self.rp_cache is not None:
      			if rest:
      				self.rp_cache[res] = rp;
      			else:
      				Elog.info('url:%s 解析域名失败' % url);
    
      		return rp
    
  • 默认的管道

      from com.anjie.base.pipeline import BasePipeline;
    
      class DefaultPipeline(BasePipeline):
      	def __init__(self):
      		super(DefaultPipeline, self).__init__();
      		pass;
    
      	def piplineData(self, data):
      		for v in data:
      			print("<------------------------->")
      			print('\n'.join(['%15s : %s' % item for item in v.__dict__.items()]))
    
  • 再看下我们调度爬虫的案例,以抓取住房信息为例。

      from com.anjie.base.spider import BaseSpider;
      from com.anjie.spider.house import House;
    
      # Created by zaizai at 2017/9/22
    
      class Spider(BaseSpider):
      	def __init__(self):
      		super(Spider, self).__init__();
      		self.seed_url = ['https://gz.zu.anjuke.com/?from=navigation'];
    
      	def pagerProcess(self, page):
      		next_link = [];
      		list_result = page.xpath('//div[@class="maincontent"]//div[@class="zu-itemmod  "]')
      		house_list = [];
      		house = None;
      		result_list = [];
      		for node in list_result:
      			h = House();
      			# print(etree.tostring(node,encoding="utf-8",pretty_print=True,method="html").decode())
       # 抽取  title = node.xpath('.//div[@class="zu-info"]/h3/a/text()');
      			h.title = title;
      			# 抽取链接
        url = node.xpath('.//div[@class="zu-info"]/h3/a/@href')
      			h.url = url;
      			temp = node.xpath('.//div[@class="zu-info"]/p[1]/text()')
      			(house_type, sale_type, level, floor_number) = temp;
      			h.house_type = house_type;
      			h.sale_type = sale_type;
      			h.level = level;
      			h.floor_number = floor_number;
      			# 抽取地址
        area_name = node.xpath('.//div[@class="zu-info"]/address/a/text()')
      			h.area_name = area_name;
      			area = node.xpath('.//div[@class="zu-info"]/address/text()')
      			for ele in area:
      				if len(ele.strip()) > 0:
      					area = ele.strip();
      					break;
      			h.addr = area;
      			# 抽取联系人
        user = node.xpath('.//div[@class="zu-info"]/p[2]/span/text()')
      			h.user = user;
      			supplement = node.xpath('.//div[@class="zu-info"]/p[2]/em/text()')
      			h.supplement = supplement;
    
      			# 获取价格
        price = node.xpath('.//div[@class="zu-side"]//strong/text()')
    
      			h.price = price;
      			unit = node.xpath('.//div[@class="zu-side"]/p/text()')
      			h.unit = unit;
      			result_list.append(h);
    
      		links = page.xpath('//*[@class="multi-page"]/a/@href');
      		# 利用集合过滤重复的链接
        s = set(links)
      		next_link = list(s);
      		print('抽取的链接:%s' % next_link)
      		self.pipeline_item = result_list;
      		self.addRequest_urls(next_link);
    
  • 如此便完成了单机单线程版的爬虫架构。

  • 虽然是但鸡蛋线程,但是通过这样子设计,后续扩展为多进程多线程,甚至分布式。

  • 例如,我们的spider可以是多个的,根据spider名字对应指定的pipeline,download、scheduler可以是公用的。为实现改需求,我们改动的地方主要的engine就可以了。

  • 目前还在摸索当中,错漏之处还望指出。