爬虫进阶:框架功能升级之断点续爬 | 爬虫 |《python学习之路》| python 技术论坛-380玩彩网官网入口

未匹配的标注

断点续爬

断点续爬设计分析

断点续爬设计原理介绍:

断点续爬的效果:爬虫程序中止后,再次启动,对已经发起的请求不再发起,而是直接从之前的队列中获取请求继续执行。这也就意味着需要实现以下两点:

1.去重标识(历史请求的指纹)持久化存储,使得新的请求可以和以前的请求进行去重对比

2.请求队列也需要持久化存储

其实也就是程序的中止不会造成请求队列和去重容器的消失,再次启动程序后,还能继续访问它们。

断点续爬无丢失方案的实现

断点续爬无丢失的代码实现:

  • 添加备份容器:利用redis的hash类型类对每一个请求对象进行存储
  • 为request对象设置重试次数属性
  • 在调度器的get_request方法中实现响应的逻辑判断
  • 实现delete_request方法:从备份中删除对应的reqeust对象
  • 实现add_lost_request方法
  • 在引擎中调用这些方法,完成断点续爬无丢失需求
# scrapy_plus/redis_hash.py
'''实现一个对redis哈希类型的操作封装'''
import redis
import pickle
from scrapy_plus.http.request import request
from scrapy_plus.conf import settings
class redisbackuprequest(object):
    '''利用hash类型,存储每一个请求对象,key是指纹,值就是请求对象'''
    redis_backup_name = settings.redis_backup_name
    redis_backup_host = settings.redis_backup_host
    redis_backup_port = settings.redis_backup_port
    redis_backup_db = settings.redis_backup_db
    def __init__(self):
        self._redis = redis.strictredis(host=self.redis_backup_host, port=self.redis_backup_port ,db=self.redis_backup_db)
        self._name = self.redis_backup_name
    # 增删改查
    def save_request(self, fp, request):
        '''将请求对象备份到redis的hash中'''
        bytes_data = pickle.dumps(request)
        self._redis.hset(self._name, fp, bytes_data)
    def delete_request(self, fp):
        '''根据请求的指纹,将其删除'''
        self._redis.hdel(self._name, fp)
    def update_request(self, fp, request):
        '''更新已有的fp'''
        self.save_request(fp, request)
    def get_requests(self):
        '''返回全部的请求对象'''
        for fp, bytes_request in self._redis.hscan_iter(self._name):
            request = pickle.loads(bytes_request)
            yield request
  • 为request对象增加重试次数属性:

      class request(object):
          '''框架内置请求对象,设置请求信息'''
          def __init__(self, url, method='get', headers=none, params=none, data=none, parse='parse', filter=true, meta=none):
              self.url = url    # 请求地址
              self.method = method    # 请求方法
              self.headers = headers    # 请求头
              self.params = params    # 请求参数
              self.data = data    # 请求体
              self.parse = parse    # 指明它的解析函数, 默认是parse方法
              self.filter = filter  # 是否进行去重,默认是true
              self.retry_time = 0    # 重试次数
              self.meta = meta
  • 修改调度器,实现对应的逻辑以及方法:

      # scrapy_plus/core/scheduler.py
      ......
      from scrapy_plus.redis_hash import redisbackuprequest
      ......
      class scheduler(object):
          '''
          缓存请求对象(request),并为下载器提供请求对象,实现请求的调度
          对请求对象进行去重判断
          '''
          def __init__(self,collector):
              if scheduler_persist: #如果使用分布式或者是持久化,使用redis的队列
                  self.queue = reidsqueue()
                  self._filter_container = redisfiltercontainer()
              else:
                  self.queue = queue()
                  self._filter_container = noramlfiltercontainer()
              self.collector = collector
          def add_reqeust(self, request):
              '''存储request对象进入队列
              return: none
              '''
              # 先判断是否要去重
              if request.filter is false:
                  self.queue.put(request)
                  logger.info("添加请求成功[%s %s]" % (request.method, request.url))
                  self.total_request_number  = 1  # 统计请求总数
                  return # 必须return
              # 判断去重,如果重复,就不添加,否则才添加
              fp = self._gen_fp(request)
              if not self.filter_request(fp, request):
                  # 往队列添加请求
                  logger.info("添加请求成功[%s %s]"%(request.method.upper(), request.url))
                  self.queue.put(request)
                  if settings.role in ['master', 'slave']:
                      self._backup_request.save_request(fp, request)   # 对请求进行备份
                  # 如果是新的请求,那么就添加进去重容器,表示请求已经添加到了队列中
                  self._filter_container.add_fp(fp)
                  self.total_request_number  = 1
              else:
                  self.repeat_request_number  = 1
          def get_request(self):
              '''从队列取出一个请求对象
              return: request object
              '''
              try:
                  request = self.queue.get(false)
              except:
                  return none
              else:
                  if request.filter is true and settings.role in ['master', 'slave']:  # 先判断 是否需要进行去重
                      # 判断重试次数是否超过规定
                      fp = self._gen_fp(request)
                      if request.retry_time >= settings.max_retry_times:
                          self._backup_request.delete_request(fp)    # 如果超过,那么直接删除
                          logger.warnning("出现异常请求,且超过最大尝试的次数:[%s]%s"%(request.method, request.url))
                      request.retry_time  = 1   # 重试次数1
                      self._backup_request.update_request(fp, request)  # 并更新到备份中
                  return request
          def delete_request(self, request):
              '''根据请求从备份删除对应的请求对象'''
              if settings.role in ['master', 'slave']:
                  fp = self._gen_fp(request)
                  self._backup_request.delete_request(fp)
          def add_lost_reqeusts(self):
              '''将丢失的请求对象再添加到队列中'''
              # 从备份容器取出来,放到队列中
              if settings.role in ['master', 'slave']:
                  for request in self._backup_request.get_requests():
                      self.queue.put(request)
          ......

本文章首发在 380玩彩网官网入口 网站上。

上一篇 下一篇
讨论数量: 0



暂无话题~
网站地图