爬虫进阶:框架功能升级之分布式爬虫 | 爬虫 |《python学习之路》| python 技术论坛-380玩彩网官网入口
分布式爬虫
分布式爬虫原理
分布式爬虫设计原理:
多台服务器同时抓取数据,请求和指纹存储在同一个redis中
实现方案
利用redis实现队列
- 注意pickle模块的使用:如果将对象存入redis中,需要先将其序列化为二进制数据,取出后反序列化就可以再得到原始对象
- 接口定义一致性:利用redis使用一个queue,使其接口同python的内置队列接口一致,可以实现无缝转换
# scrapy_plus/queue.py
import time
import pickle
import redis
from six.moves import queue as basequeue
# redis队列默认配置
redis_queue_name = 'request_queue'
redis_queue_host = 'localhost'
redis_queue_port = 6379
redis_queue_db = 10
# 利用redis实现一个queue,使其接口同python的内置队列接口一致,可以实现无缝转换
class queue(object):
"""
a queue like message built over redis
"""
empty = basequeue.empty
full = basequeue.full
max_timeout = 0.3
def __init__(self, maxsize=0, name=redis_queue_name, host=redis_queue_host, port=redis_queue_port, db=redis_queue_db,
lazy_limit=true, password=none):
"""
constructor for redisqueue
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
lazy_limit: redis queue is shared via instance, a lazy size limit is used
for better performance.
"""
self.name = name
self.redis = redis.strictredis(host=host, port=port, db=db, password=password)
self.maxsize = maxsize
self.lazy_limit = lazy_limit
self.last_qsize = 0
def qsize(self):
self.last_qsize = self.redis.llen(self.name)
return self.last_qsize
def empty(self):
if self.qsize() == 0:
return true
else:
return false
def full(self):
if self.maxsize and self.qsize() >= self.maxsize:
return true
else:
return false
def put_nowait(self, obj):
if self.lazy_limit and self.last_qsize < self.maxsize:
pass
elif self.full():
raise self.full
self.last_qsize = self.redis.rpush(self.name, pickle.dumps(obj))
return true
def put(self, obj, block=true, timeout=none):
if not block:
return self.put_nowait(obj)
start_time = time.time()
while true:
try:
return self.put_nowait(obj)
except self.full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def get_nowait(self):
ret = self.redis.lpop(self.name)
if ret is none:
raise self.empty
return pickle.loads(ret)
def get(self, block=true, timeout=none):
if not block:
return self.get_nowait()
start_time = time.time()
while true:
try:
return self.get_nowait()
except self.empty:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
通过配置文件选择是否启用分布式
# 项目文件夹/settings.py
......
# 设置调度器的内容是否要持久化
# 量个值:true和false
# 如果是true,那么就是使用分布式,同时也是基于请求的增量式爬虫
# 如果是false, 不会说那个redis队列,会使用python的set存储指纹和请求
scheduler_persist = false
修改调度器实现请求对象的持久化
#scrapy_plus/core/scheduler.py
from six.moves.queue import queue
from scrapy_plus.utils.queue import queue as reidsqueue
from scrapy_plus.conf.settings import scheduler_persist
class scheduler:
def __init__(self,collector):
self._filter_container = set()
if scheduler_persist: #如果使用分布式或者是持久化,使用redis的队列
self.queue = reidsqueue()
else:
self.queue = queue()
self.repeate_request_num = 0
需要实现分布式,除了请求对象需要存储在redis中,还需要对请求进行去重,包括自动退出判断问题(分布式的话通常不需要自动退出)
利用redis的集合类型实现去重
如果分布式中请求去重的去重容器各个服务器用的不是同一个,那么就无法达到去重的目的,因此这里同样的需要使用redis来实现去重容器,也就是把所有的去重指纹都存储在redis中
实现一个自定义的set:
# scrapy_plus/set.py
import redis
from scrapy_plus.conf import settings
class basefiltercontainer(object):
def add_fp(self, fp):
'''往去重容器添加一个指纹'''
pass
def exists(self, fp):
'''判断指纹是否在去重容器中'''
pass
class noramlfiltercontainer(basefiltercontainer):
'''利用python的集合类型'''
def __init__(self):
self._filter_container = set()
def add_fp(self, fp):
''''''
self._filter_container.add(fp)
def exists(self, fp):
'''判断指纹是否在去重容器中'''
if fp in self._filter_container:
return true
else:
return false
class redisfiltercontainer(basefiltercontainer):
redis_set_name = settings.redis_set_name
redis_set_host = settings.redis_set_host
redis_set_port = settings.redis_set_port
redis_set_db = settings.redis_set_db
def __init__(self):
self._redis = redis.strictredis(host=self.redis_set_host, port=self.redis_set_port ,db=self.redis_set_db)
self._name = self.redis_set_name
def add_fp(self, fp):
'''往去重容器添加一个指纹'''
self._redis.sadd(self._name, fp)
def exists(self, fp):
'''判断指纹是否在去重容器中'''
return self._redis.sismember(self._name, fp)
在调度器中使用这个set.py
, 使得分布式模式下的去重功能正常运作
# scrapy_plus/core/scheduler.py
from hashlib import sha1
import w3lib.url
from six.moves.queue import queue
from scrapy_plus.conf import settings
from scrapy_plus.queue import queue as redisqueue
from scrapy_plus.set import noramlfiltercontainer, redisfiltercontainer
from scrapy_plus.utils.log import logger
......
class scheduler(object):
'''
1. 缓存请求对象(request),并为下载器提供请求对象,实现请求的调度
2. 对请求对象进行去重判断
'''
def __init__(self,collector):
if scheduler_persist: #如果使用分布式或者是持久化,使用redis的队列
self.queue = reidsqueue()
self._filter_container = redisfiltercontainer() #使用redis作为python的去重的容器
else: #如果不适用分布式或者持久化,使用python的set作为去重的容器
self.queue = queue()
self._filter_container = noramlfiltercontainer()
self.repeate_request_num = 0
def _filter_request(self, request):
# 去重容器:存储已经发过的请求的特征 url 选用集合类型:set()
# 利用请求的url method data 求出一个指纹 利用sha1
request.fp = self._gen_fp(request) #把指纹作为request的一个属性
if not self._filter_container.exists(request.fp):
self._filter_container.add_fp(request.fp)
# logger.info("添加新的请求:<%s>" % request.url)
return true
else:
logger.info("发现重复的请求:<%s>" % request.url)
self.repeate_request_num =1
return false
程序结束的条件
在之前的单机版本的代码中,通过:总的响应 总的重复数>=总的请求来判断程序结束,但是在分布式的版本那种,每个服务器的请求数量和响应数量不在相同
因为每个服务器存入队列的请求,和成功发送的请求中间可能很多请求被其他的服务器发送了,导致数量不一致,所以可以把总的请求,总的响应,总的重复等信息记录在redis中,那么所有的服务端修改的数据的位置是同一个redis中的内容,所有的服务端判断退出的时候也是通过比较同一个redis中的这些数据来决定
此时,在utils中新建stats_collector.py文件,来实现对各种数量的统计,包括总的请求数量,总的响应数量,总的重复数量
import redis
from scrapy_plus.conf.settings import redis_queue_name, redis_queue_host, redis_queue_port, redis_queue_db
# redis队列默认配置
# redis_queue_name = 'request_queue'
# redis_queue_host = 'localhost'
# redis_queue_port = 6379
# redis_queue_db = 10
class statscollector(object):
def __init__(self, spider_names=[], host=redis_queue_host, port=redis_queue_port, \
db=redis_queue_db, password=none):
self.redis = redis.strictredis(host=host, port=port, db=db, password=password)
#存储请求数量的键
self.request_nums_key = "_".join(spider_names) "_request_nums"
#存储响应数量的键
self.response_nums_key = "_".join(spider_names) "_response_nums"
#存储重复请求的键
self.repeat_request_nums_key = "_".join(spider_names) "_repeat_request_nums"
#存储start_request数量的键
self.start_request_nums_key = "_".join(spider_names) "_start_request_nums"
def incr(self, key):
'''给键对应的值增加1,不存在会自动创建,并且值为1,'''
self.redis.incr(key)
def get(self, key):
'''获取键对应的值,不存在是为0,存在则获取并转化为int类型'''
ret = self.redis.get(key)
if not ret:
ret = 0
else:
ret = int(ret)
return ret
def clear(self):
'''程序结束后清空所有的值'''
self.redis.delete(self.request_nums_key, self.response_nums_key, \
self.repeat_request_nums_key, self.start_request_nums_key)
@property
def request_nums(self):
'''获取请求数量'''
return self.get(self.request_nums_key)
@property
def response_nums(self):
'''获取响应数量'''
return self.get(self.response_nums_key)
@property
def repeat_request_nums(self):
'''获取重复请求数量'''
return self.get(self.repeat_request_nums_key)
@property
def start_request_nums(self):
'''获取start_request数量'''
return self.get(self.start_request_nums_key)
修改engine.py
# coding=utf-8
import datetime
import time
import importlib
from scrapy_plus.conf.settings import spiders, pipelines, \
spider_middlewares, downloader_middlewares, max_async_number,async_type
if async_type == "coroutine":
from gevent.monkey import patch_all
patch_all()
from gevent.pool import pool
elif async_type == "thread":
from multiprocessing.dummy import pool
from .downloader import downloader
from .scheduler import scheduler
from scrapy_plus.http.request import request
from scrapy_plus.utils.log import logger
from scrapy_plus.utils.stats_collector import statscollector
class engine:
def __init__(self):
self.spiders = self._auto_import_instances(spiders, isspider=true)
self.downloader = downloader()
self.pipelines = self._auto_import_instances(pipelines)
self.collector = statscollector(self.spiders)
self.scheduler = scheduler(self.collector)
self.downloader_mids = self._auto_import_instances(downloader_middlewares)
self.spider_mids = self._auto_import_instances(spider_middlewares)
# self.total_request_num = 0
# self.total_response_num = 0
self.pool = pool(4)
self.is_running = false
def _auto_import_instances(self, path=[], isspider=false):
instances = {} if isspider else []
for p in path:
model_name = p.rsplit(".", 1)[0]
cls_name = p.rsplit(".", 1)[-1]
model = importlib.import_module(model_name)
cls = getattr(model, cls_name)
if isspider:
instances[cls.name] = cls()
else:
instances.append(cls())
return instances
def start(self):
t_start = datetime.datetime.now()
logger.info("爬虫开始启动:{}".format(t_start))
logger.info("爬虫运行模式:{}".format(async_type))
logger.info("最大并发数:{}".format(max_async_number))
logger.info("启动的爬虫有:{}".format(list(self.spiders.keys())))
logger.info("启动的下载中间件有:\n{}".format(downloader_middlewares))
logger.info("启动的爬虫中间件有:\n{}".format(spider_middlewares))
logger.info("启动的管道有:\n{}".format(pipelines))
self._start_engine()
t_end = datetime.datetime.now()
logger.info("爬虫结束:{}".format(t_end))
logger.info("耗时:%s" % (t_end - t_start).total_seconds())
# logger.info("一共获取了请求:{}个".format(self.total_request_num))
# logger.info("重复的请求:{}个".format(self.scheduler.repeate_request_num))
# logger.info("成功的请求:{}个".format(self.total_response_num))
logger.info("一共获取了请求:{}个".format(self.collector.request_nums))
logger.info("重复的请求:{}个".format(self.collector.repeat_request_nums))
logger.info("成功的请求:{}个".format(self.collector.response_nums))
self.collector.clear()
def _start_request_callback(self,temp):
self.collector.incr(self.collector.start_request_nums_key)
def _start_request(self):
def _func(spider_name,spider):
for start_request in spider.start_requests():
for spider_mid in self.spider_mids:
start_request = spider_mid.process_request(start_request)
start_request.spider_name = spider_name
self.scheduler.add_request(start_request)
#使用collector进行数据的收集
# self.total_request_num = 1
self.collector.incr(self.collector.request_nums_key)
for spider_name, spider in self.spiders.items():
self.pool.apply_async(_func,args=(spider_name,spider),callback=self._start_request_callback)
def _execute_request_response_item(self):
request = self.scheduler.get_request()
if request is none:
return
spider = self.spiders[request.spider_name]
for downloader_mid in self.downloader_mids:
request = downloader_mid.process_request(request)
response = self.downloader.get_response(request)
response.meta = request.meta
for downloader_mid in self.downloader_mids:
response = downloader_mid.process_response(response)
for spider_mid in self.spider_mids:
response = spider_mid.process_response(response)
parse = getattr(spider, request.parse)
for ret in parse(response):
if isinstance(ret, request):
ret.spider_name = request.spider_name
for spider_mid in self.spider_mids:
ret = spider_mid.process_request(ret)
self.scheduler.add_request(ret)
#使用collector进行数据的收集
# self.total_request_num = 1
self.collector.incr(self.collector.request_nums_key)
else:
for pipeline in self.pipelines:
pipeline.process_item(ret, spider)
#使用collector进行数据的收集
# self.total_response_num = 1
self.collector.incr(self.collector.response_nums_key)
def _callback(self, temp):
if self.is_running:
self.pool.apply_async(self._execute_request_response_item, callback=self._callback)
def _start_engine(self):
# spider中的start_url开始启动
self.is_running = true
self.pool.apply_async(self._start_request)
for i in range(max_async_number):
self.pool.apply_async(self._execute_request_response_item, callback=self._callback)
while true:
time.sleep(0.001)
# if self.total_response_num self.scheduler.repeate_request_num >= self.total_request_num
#当start_request的执行数量和爬虫的数量相同的时候
if self.collector.response_nums self.collector.repeat_request_nums >= self.collector.request_nums:
self.is_running = false
break