EasySpider爬虫架构设计练气版
目录
一、前言:
- 说到数据收集,大家就会想到爬虫,而说到爬虫,目前市场上开源的爬虫很多且非常强大,从单机的到分布式的层出不穷、争奇斗艳。
- 但是对于学习来说,我还是觉得有必要自己尝试着设计一个爬虫架构,可能你开始没什么想法,此时你可以参考一些优秀的如python爬虫scrapy,java爬虫WebMagic,当然WebMagic的设计思想也部分来自于scrapy。
- 本次,我说介绍的架构可能很漏,但是确是通过思考后自己设想并付诸实践整理出来的,其中也参考了webmagic的模块化思想。
二、模块
结构模块
- 其主要分为五个模块
- 1、spider:负责与用户交互的模块,配置启动url,代理、数据解析等功能
- 2、scheduler:负责请求链接的管理调度,目前只是实现了基本的存储获取,后续会扩展为支持各种调度策略的一个模块
- 3、download:下载模块,负责资源的下载,数据的缓存等。
- 4、pipeline:数据通道,实现很简单,对抽取出来的数据做存储或者分析之用。
- 5、engine:引擎,负责调度四个类型的模块进行运转。
三、代码详解
-
创建四个Base类来分别定义spider、scheduler、download、pipeline的方法字段标准
-
BaseSpider
from lxml.html import etree; from com.anjie.e_request import ERequest; class BaseSpider: def __init__(self): super(BaseSpider, self).__init__(); self.seed_url = None; self.pipeline_item = None; self.requests = []; def addRequest_urls(self, urls): if urls: for r in map(lambda url:ERequest(url), urls): self.requests.append(r); def pagerProcess(self, page): pass; def pagerBack(self, html): page = root = etree.HTML(html); self.pagerProcess(page); item = self.pipeline_item; requests = None; if len(self.requests) > 0: requests = self.requests.copy(); return (self.pipeline_item, requests) # 返回种子请求 def getSeeds(self): result = []; for url in self.seed_url: r = ERequest(url=url); result.append(r); return result;
-
BaseScheduler
class BaseScheduler: def __init__(self): super(BaseScheduler, self).__init__(); def __iter__(self): return self def __next__(self): pass; def nextRequest(self): pass def addRequest(self, rq): pass def addRequests(self, rqs): pass; def addLoserRequest(self, rq): pass;
-
BaseDownload
# Created by zaizai at 2017/9/22 class BaseDownload: def __init__(self): super(BaseDownload, self).__init__(); def excuteRequest(self, rq): pass;
-
BasePipeline
class BasePipeline: def __init__(self): super(BasePipeline,self).__init__(); def piplineData(self, data): pass
-
OK,如果后续需要实现不同的爬虫、下载器、调度器、数据管道则通过继承上面的base类,起到规范的作用。
-
OK,我们来看下目前的目录结构:
-
base:存储base类
-
mode:存储构建过程中定义的一些类型结构
-
module:存储默认的几个类,如默认的下载器、默认的调度器、默认的管道
-
spider:存放爬虫代码
-
utils:存储一个辅助工具类
-
最外围为我们的engine类。
-
我们先看下引擎类的代码,目前代码还是比较简单的。
# Created by zaizai at 2017/9/21 from com.anjie.module.default_download import DefaultDownload; from com.anjie.module.default_scheduler import DefaultScheduler; from com.anjie.utils.elog import Elog; from com.anjie.spider.myspider import Spider class Engine: def __init__(self,spider = None,scheduler = DefaultScheduler(),download =DefaultDownload(),pipline=None ): super(Engine, self).__init__(); self.spider = spider; self.scheduler = scheduler; self.download = download(); self.pipline = pipline; def addSpider(self, spider): self.spider = spider; def start(self): self.scheduler.addRequests(self.spider.getSeeds()) while True: rq = self.scheduler.nextRequest(); if rq is None: Elog.warning('Engine is will stop,because scheduler has not more request be schedule'); break; resultPage = self.download.excuteRequest(rq); if resultPage is not None: (pipelineItems, nextRequests) = self.spider.pagerBack(resultPage); if pipelineItems and self.pipline: self.pipline.piplineData(pipelineItems); if nextRequests: self.scheduler.addRequests(nextRequests); else: # 判断是否需要加入请求重新请求队列 pass def sleep(self, time): pass; def stop(self): pass if __name__ == '__main__': e = Engine(); e.addSpider(Spider()); e.start()
-
里面先从spider中获取seedUrl传入scheduler,然后就进行while循环从scheduler中不断取出url。代码很简单。
-
我们再看下几个默认的模块
-
默认的下载器
import random; from urllib import request, error from com.anjie.base.download import BaseDownload; from com.anjie.utils.elog import Elog from com.anjie.utils.throttle import Throttle; # 默认的延迟事件 DEFAULT_DELAY = 5 # 默认的重试次数 DEFAULT_RETRIES = 1 # 默认的超时事件 DEFAULT_TIMEOUT = 60 class DefaultDownload(BaseDownload): def __init__(self, num_retries=DEFAULT_RETRIES, cache=None, proxies=None, delay=DEFAULT_DELAY): super(DefaultDownload, self).__init__(); # 重试次数 self.num_retries = num_retries; # 代理 # 缓存 self.cache = cache; self.proxies = proxies; self.throttle = Throttle(delay) def excuteRequest(self, rq): return self.download(rq) def download(self, rq): print('download url is %s' % rq.url); result = None; if self.cache is not None: try: result = self.cache[rq.url]; except KeyError as e: Elog.warning('url %s is available in cache' % rq.url) pass; else: # 没有异常时执行这里 if result is not None and self.num_retries > 0 and 500 <= result['code'] < 600: # 上次请求时没有拿到数据 Elog.info("server error so ignore result from cache of url %s and re-download" % rq.url); result = None; # 没有配置cache获取cache未缓存该url数据则走这里 if result is not None: return result['html']; if result is None: Elog.info("url %s is haven't cache, so still need to download"); # proxy = random.choice(self.proxies) if self.proxies else None proxy = None; if self.proxies is not None: proxy = random.choice(self.proxies); result = self.realDownload(rq, proxy=proxy, num_retries=self.num_retries); if self.cache is not None: # save data to cache self.cache[rq.url] = result; return result['html']; def realDownload(self, rq, proxy, num_retries, data=None): # 进行延迟请求 self.throttle.wait(rq.url) html = None; code = None; try: rq = request.Request(url=rq.url, headers=rq.headers); rp = request.urlopen(rq); Elog.info('download over url is: %s' % rp.geturl()) html = rp.read(); code = rp.code; except error.URLError as e: Elog.error('download error :%s' % e.reason); html = ''; if self.num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: return self.realDownload(rq, proxy, num_retries - 1); pass; return {'html': html, 'code': code}; if __name__ == '__main__': d = DefaultDownload(); print(d.download('http://www.baidu.com'))
-
默认的调度器:
from urllib import parse; from urllib.robotparser import RobotFileParser; from com.anjie.base.scheduler import BaseScheduler from com.anjie.mode.spider_exception import SpiderException; from com.anjie.utils.elog import Elog class DefaultScheduler(BaseScheduler): def __init__(self): super(DefaultScheduler, self).__init__(); # 待抓取队列 self.belle_queue = list(); # 下载失败队列 self.loser_queue = []; # robots数据缓存 self.rp_cache_queue = []; # robots禁止队列 self.robots_loser_queue = [] # 已爬取url self.crawl_over_queue = []; self.rp_cache = dict(); def __iter__(self): return self def __next__(self): r = self.belle_queue.pop(); if r is None: raise StopIteration else: return r; def nextRequest(self): r = self.belle_queue.pop(); if r is None: return None; else: return r; def addRequest(self, rq): pass; def addRequests(self, rqs): self.belle_queue.extend(rqs); def addLoserRequest(self, rq): self.belle_queue.extend(rq); def start_craw(self): if not self.spider: raise SpiderException("spider obeject is None") if not hasattr(self.spider, 'start_url'): raise SpiderException("spider must have an start_url attribute") if not hasattr(self.spider, 'pager_back'): raise SpiderException("spider must have an pager_back method") self.crawl_queue.extend(self.spider.start_url); # 初始化每个url请求的次数为0 while self.crawl_queue: url = self.crawl_queue.pop(); html = None; # 咱要做一只优雅的爬虫 rp = self.get_robots(url); if rp is not None: if rp.can_fetch(useragent=self.download.user_agent, url=url): html = self.download.download(url=url); else: Elog.warning( 'current url : %s and user_agent: %s is be disallow for robots.txt' % (url, self.user_agent)) html = None; if html: self.crawl_over_queue.append(url); pipeline_item = self.spider.pager_back(url, html); # 判断是否有增加请求url print(pipeline_item) if self.pipline and pipeline_item and pipeline_item['item']: self.pipline.piplineData(pipeline_item['item']); if pipeline_item['url']: for r in pipeline_item['url']: if not r in self.crawl_over_queue and r not in self.crawl_queue: self.crawl_queue.append(r); # 解析robots.txt文件 def get_robots(self, url): """Initialize robots parser for this domain """ (proto, rest) = parse.splittype(url) # 获取域名res res, rest = parse.splithost(rest) rp = None; try: rp = self.rp_cache[res]; except KeyError as e: Elog.error('key error'); pass; else: return rp; rp = RobotFileParser() rp.set_url(parse.urljoin(url, '/robots.txt')) rp.read() if self.rp_cache is not None: if rest: self.rp_cache[res] = rp; else: Elog.info('url:%s 解析域名失败' % url); return rp
-
默认的管道
from com.anjie.base.pipeline import BasePipeline; class DefaultPipeline(BasePipeline): def __init__(self): super(DefaultPipeline, self).__init__(); pass; def piplineData(self, data): for v in data: print("<------------------------->") print('\n'.join(['%15s : %s' % item for item in v.__dict__.items()]))
-
再看下我们调度爬虫的案例,以抓取住房信息为例。
from com.anjie.base.spider import BaseSpider; from com.anjie.spider.house import House; # Created by zaizai at 2017/9/22 class Spider(BaseSpider): def __init__(self): super(Spider, self).__init__(); self.seed_url = ['https://gz.zu.anjuke.com/?from=navigation']; def pagerProcess(self, page): next_link = []; list_result = page.xpath('//div[@class="maincontent"]//div[@class="zu-itemmod "]') house_list = []; house = None; result_list = []; for node in list_result: h = House(); # print(etree.tostring(node,encoding="utf-8",pretty_print=True,method="html").decode()) # 抽取 title = node.xpath('.//div[@class="zu-info"]/h3/a/text()'); h.title = title; # 抽取链接 url = node.xpath('.//div[@class="zu-info"]/h3/a/@href') h.url = url; temp = node.xpath('.//div[@class="zu-info"]/p[1]/text()') (house_type, sale_type, level, floor_number) = temp; h.house_type = house_type; h.sale_type = sale_type; h.level = level; h.floor_number = floor_number; # 抽取地址 area_name = node.xpath('.//div[@class="zu-info"]/address/a/text()') h.area_name = area_name; area = node.xpath('.//div[@class="zu-info"]/address/text()') for ele in area: if len(ele.strip()) > 0: area = ele.strip(); break; h.addr = area; # 抽取联系人 user = node.xpath('.//div[@class="zu-info"]/p[2]/span/text()') h.user = user; supplement = node.xpath('.//div[@class="zu-info"]/p[2]/em/text()') h.supplement = supplement; # 获取价格 price = node.xpath('.//div[@class="zu-side"]//strong/text()') h.price = price; unit = node.xpath('.//div[@class="zu-side"]/p/text()') h.unit = unit; result_list.append(h); links = page.xpath('//*[@class="multi-page"]/a/@href'); # 利用集合过滤重复的链接 s = set(links) next_link = list(s); print('抽取的链接:%s' % next_link) self.pipeline_item = result_list; self.addRequest_urls(next_link);
-
如此便完成了单机单线程版的爬虫架构。
-
虽然是但鸡蛋线程,但是通过这样子设计,后续扩展为多进程多线程,甚至分布式。
-
例如,我们的spider可以是多个的,根据spider名字对应指定的pipeline,download、scheduler可以是公用的。为实现改需求,我们改动的地方主要的engine就可以了。
-
目前还在摸索当中,错漏之处还望指出。