Scrapy-Redis库已经为我们提供了Scrapy分布式的队列、调度器、去重等功能,其GitHub地址为:https://github.com/rmax/scrapy-redis。

本节我们深入了解一下,利用Redis如何实现Scrapy分布式。

1. 获取源码

可以把源码Clone下来,执行如下命令:

git clone https://github.com/rmax/scrapy-redis.git

核心源码在scrapy-redis/src/scrapy_redis目录下。

2. 爬取队列

从爬取队列入手,看看它的具体实现。源码文件为queue.py,它有三个队列的实现,首先它实现了一个父类Base,提供一些基本方法和属性,如下所示:

class Base(object):    """Per-spider base queue class"""    def __init__(self, server, spider, key, serializer=None):        if serializer is None:            serializer = picklecompat        
       if not hasattr(serializer, 'loads'):            
           raise TypeError("serializer does not implement 'loads' function: %r"                            % serializer)        
       if not hasattr(serializer, 'dumps'):            
           raise TypeError("serializer '%s' does not implement 'dumps' function: %r"                            % serializer)        self.server = server        self.spider = spider        self.key = key % {'spider': spider.name}        self.serializer = serializer  
 
   def _encode_request(self, request):        obj = request_to_dict(request, self.spider)        
       return self.serializer.dumps(obj)  
        
   def _decode_request(self, encoded_request):        obj = self.serializer.loads(encoded_request)        
       return request_from_dict(obj, self.spider)
          
   def __len__(self):        """Return the length of the queue"""        raise NotImplementedError    
   
   def push(self, request):        """Push a request"""        raise NotImplementedError    
   
   def pop(self, timeout=0):        """Pop a request"""        raise NotImplementedError    
   
   def clear(self):        """Clear queue/stack"""        self.server.delete(self.key)

首先看一下_encode_request()和_decode_request()方法。我们要把一个Request对象存储到数据库中,但数据库无法直接存储对象,所以先要将Request序列化转成字符串,而这两个方法分别可以实现序列化和反序列化的操作,这个过程可以利用pickle库来实现。队列Queue在调用push()方法将Request存入数据库时,会调用_encode_request()方法进行序列化,在调用pop()取出Request时,会调用_decode_request()进行反序列化。

在父类中,__len__()push()pop()这三个方法都是未实现的,三个方法直接抛出NotImplementedError异常,因此这个类不能直接使用。那么,必须要实现一个子类来重写这三个方法,而不同的子类就会有不同的实现和不同的功能。

接下来我们定义一些子类来继承Base类,并重写这几个方法。在源码中有三个子类的实现,它们分别是FifoQueuePriorityQueueLifoQueue,我们分别来看看它们的实现原理。

首先是FifoQueue,如下所示:

class FifoQueue(Base):    """Per-spider FIFO queue"""    def __len__(self):        """Return the length of the queue"""        return self.server.llen(self.key)    
   
   def push(self, request):        """Push a request"""        self.server.lpush(self.key, self._encode_request(request))    
   
   def pop(self, timeout=0):        """Pop a request"""        if timeout > 0:            data = self.server.brpop(self.key, timeout)            
               if isinstance(data, tuple):                    data = data[1]        
       else:            data = self.server.rpop(self.key)        
       if data:            
           return self._decode_request(data)

这个类继承了Base类,并重写了__len__()push()pop()三个方法,这三个方法都是对server对象的操作。server对象就是一个Redis连接对象,我们可以直接调用其操作Redis的方法对数据库进行操作,这里的操作方法有llen()lpush()rpop()等,这就代表此爬取队列使用了Redis的列表。序列化后的Request会存入列表中,__len__()方法获取列表的长度,push()方法调用了lpush()操作,这代表从列表左侧存入数据,pop()方法中调用了rpop()操作,这代表从列表右侧取出数据。

Request在列表中的存取顺序是左侧进、右侧出,这是有序的进出,即先进先出(First Input First Output,FIFO),此类的名称就叫作FifoQueue

还有一个与之相反的实现类,叫作LifoQueue,实现如下:

class LifoQueue(Base):    """Per-spider LIFO queue."""    def __len__(self):        """Return the length of the stack"""        return self.server.llen(self.key)    
   
   def
push(self, request):
       """Push a request"""        self.server.lpush(self.key, self._encode_request(request))    
   
   def
pop(self, timeout=0):
       """Pop a request"""        if timeout > 0:            data = self.server.blpop(self.key, timeout)            
           if isinstance(data, tuple):                data = data[1]        
       else:            data = self.server.lpop(self.key)        
       if data:            
           return self._decode_request(data)

FifoQueue不同的是LifoQueuepop()方法,它使用的是lpop()操作,也就是从左侧出,push()方法依然使用lpush()操作,从左侧入。那么效果就是先进后出、后进先出(Last In First Out,LIFO),此类名称就叫作LifoQueue。这个存取方式类似栈的操作,所以也可以称作StackQueue

在源码中还有一个子类叫作PriorityQueue,顾名思义,它是优先级队列,实现如下:

class PriorityQueue(Base):    """Per-spider priority queue abstraction using redis' sorted set"""    def __len__(self):        """Return the length of the queue"""        return self.server.zcard(self.key)    

   def
push(self, request):
       """Push a request"""        data = self._encode_request(request)        score = -request.priority        self.server.execute_command('ZADD', self.key, score, data)    
   
   def
pop(self, timeout=0):
       """        Pop a request        timeout not support in this queue class        """        pipe = self.server.pipeline()        pipe.multi()        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)        results, count = pipe.execute()
       if results:            
           return self._decode_request(results[0])

在这里__len__()push()pop()方法使用了server对象的zcard()zadd()zrange()操作,这里使用的存储结果是有序集合,这个集合中的每个元素都可以设置一个分数,这个分数就代表优先级。

__len__()方法调用了zcard()操作,返回的就是有序集合的大小,也就是爬取队列的长度。push()方法调用了zadd()操作,就是向集合中添加元素,这里的分数指定成Request的优先级的相反数,分数低的会排在集合的前面,即高优先级的Request就会在集合的最前面。pop()方法首先调用了zrange()操作,取出集合的第一个元素,第一个元素就是最高优先级的Request,然后再调用zremrangebyrank()操作,将这个元素删除,这样就完成了取出并删除的操作。

此队列是默认使用的队列,即爬取队列默认是使用有序集合来存储的。

3. 去重过滤

前面说过Scrapy的去重是利用集合来实现的,而在Scrapy分布式中的去重就需要利用共享的集合,那么这里使用的就是Redis中的集合数据结构。我们来看看去重类是怎样实现的,源码文件是dupefilter.py,其内实现了一个RFPDupeFilter类,如下所示:

class RFPDupeFilter(BaseDupeFilter):    """Redis-based request duplicates filter.    This class can also be used with default Scrapy's scheduler.    """    logger = logger    
   def __init__(self, server, key, debug=False):        """Initialize the duplicates filter.        Parameters        ----------        server : redis.StrictRedis            The redis server instance.        key : str            Redis key Where to store fingerprints.        debug : bool, optional            Whether to log filtered requests.        """        self.server = server        self.key = key        self.debug = debug        self.logdupes = True    @classmethod    def from_settings(cls, settings):        """Returns an instance from given settings.        This uses by default the key ``dupefilter:<timestamp>``. When using the        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as        it needs to pass the spider name in the key.        Parameters        ----------        settings : scrapy.settings.Settings        Returns        -------        RFPDupeFilter            A RFPDupeFilter instance.        """        server = get_redis_from_settings(settings)        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}        debug = settings.getbool('DUPEFILTER_DEBUG')        
       return cls(server, key=key, debug=debug)
           
   @classmethod    def from_crawler(cls, crawler):        """Returns instance from crawler.        Parameters        ----------        crawler : scrapy.crawler.Crawler        Returns        -------        RFPDupeFilter            Instance of RFPDupeFilter.        """        return cls.from_settings(crawler.settings)
      
   def request_seen(self, request):        """Returns True if request was already seen.        Parameters        ----------        request : scrapy.http.Request        Returns        -------        bool        """        fp = self.request_fingerprint(request)        added = self.server.sadd(self.key, fp)        
       return added == 0    def request_fingerprint(self, request):        """Returns a fingerprint for a given request.        Parameters        ----------        request : scrapy.http.Request        Returns        -------        str        """        return request_fingerprint(request)    
       
   def close(self, reason=''):        """Delete data on close. Called by Scrapy's scheduler.        Parameters        ----------        reason : str, optional        """        self.clear()
   
   def clear(self):        """Clears fingerprints data."""        self.server.delete(self.key)
   
   def log(self, request, spider):        """Logs given request.        Parameters        ----------        request : scrapy.http.Request        spider : scrapy.spiders.Spider        """        if self.debug:            msg = "Filtered duplicate request: %(request)s"            self.logger.debug(msg, {'request': request}, extra={'spider': spider})        
       elif self.logdupes:            msg = ("Filtered duplicate request %(request)s"                   " - no more duplicates will be shown"                   " (see DUPEFILTER_DEBUG to show all duplicates)")            self.logger.debug(msg, {'request': request}, extra={'spider': spider})            self.logdupes = False

这里同样实现了一个request_seen()方法,和Scrapy中的request_seen()方法实现极其类似。不过这里集合使用的是server对象的sadd()操作,也就是集合不再是一个简单数据结构了,而是直接换成了数据库的存储方式。

鉴别重复的方式还是使用指纹,指纹同样是依靠request_fingerprint()方法来获取的。获取指纹之后就直接向集合添加指纹,如果添加成功,说明这个指纹原本不存在于集合中,返回值1。代码中最后的返回结果是判定添加结果是否为0,如果刚才的返回值为1,那这个判定结果就是False,也就是不重复,否则判定为重复。

这样我们就成功利用Redis的集合完成了指纹的记录和重复的验证。

4. 调度器

Scrapy-Redis还帮我们实现了配合Queue、DupeFilter使用的调度器Scheduler,源文件名称是scheduler.py。我们可以指定一些配置,如SCHEDULER_FLUSH_ON_START即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST即是否在爬取结束后保持爬取队列不清除。我们可以在settings.py里自由配置,而此调度器很好地实现了对接。

接下来我们看看两个核心的存取方法,实现如下所示:

def enqueue_request(self, request):    if not request.dont_filter and self.df.request_seen(request):        self.df.log(request, self.spider)        
       return False    if self.stats:        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)    self.queue.push(request)    
   return True

def next_request(self):    block_pop_timeout = self.idle_before_close    request = self.queue.pop(block_pop_timeout)    
   if request and self.stats:        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)    
   return request

enqueue_request()可以向队列中添加Request,核心操作就是调用Queue的push()操作,还有一些统计和日志操作。next_request()就是从队列中取Request,核心操作就是调用Queue的pop()操作,此时如果队列中还有Request,则Request会直接取出来,爬取继续,否则如果队列为空,爬取则会重新开始。

5. 总结

目前为止,我们就之前所说的三个分布式的问题解决了,总结如下。

  • 爬取队列的实现。这里提供了三种队列,使用了Redis的列表或有序集合来维护。

  • 去重的实现。这里使用了Redis的集合来保存Request的指纹,以提供重复过滤。

  • 中断后重新爬取的实现。中断后Redis的队列没有清空,爬取再次启动时,调度器的next_request()会从队列中取到下一个Request,爬取继续。


©著作权归作者所有:来自51CTO博客作者mb5fe159f193922的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Selenium的使用方法简介
  2. Python数据科学:正则化方法
  3. 11种数据分析方法,别再说你不会了
  4. 学编程这么久,还傻傻分不清什么是方法(method),什么是函数(function)?
  5. 数据结构之优先队列和堆
  6. 最全总结!聊聊 Python 操作PDF的几种方法(合并、拆分、水印、加密)
  7. 数据分析方法论
  8. 异步函数中的异常处理及测试方法 [每日前端夜话(0x18)]
  9. Python 进阶之源码分析:如何将一个类方法变为多个方法?

随机推荐

  1. 在contenteditable中的占位符—焦点事件
  2. CasperJs和Jquery用链式选择
  3. 当函数在单独的PHP文件中定义时,调用JavaS
  4. 用javascript 禁止右键,禁止复制,禁止粘
  5. hzjs颠覆jquery,按照中国人思维开发的最简
  6. 百度搜索功能
  7. 如何检查不包含提交按钮的HTML5表单的有
  8. 为什么不调用我的jQuery.get()回调?
  9. 如何在我的页面上以漂亮的格式显示JSON对
  10. 获取Backbone Model实例的模型/类名