Golang(Go语言)爬虫框架colly简明教程及源码阅读与分析

使用了一下colly这个爬虫框架,发现非常的好用,它的设计还是值得学习一下的,API设计的非常简洁。不过首先 我们要看看这玩意儿咋用。

colly的安装和使用

首先如果我们在项目里引用,就要先安装一下:

$ go get -u github.com/gocolly/colly/...

我们看个简单的demo:

package main

import (
	"fmt"

	"github.com/gocolly/colly"
	"github.com/gocolly/colly/extensions"
)

func main() {
	c := colly.NewCollector(
		colly.AllowedDomains("httpbin.org"), // 要限定域名,否则就把全网都爬下来了
	)

	extensions.RandomUserAgent(c) // 使用随机的UserAgent,最好能使用代理。这样就不容易被ban
	extensions.Referer(c)

	// Find and visit all links
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		e.Request.Visit(e.Attr("href"))
	})

	c.OnRequest(func(r *colly.Request) {
		fmt.Println("Visiting", r.URL)
	})

	c.Visit("http://httpbin.org/")
}

我们来运行一下:

$ go build -o main && ./main
Visiting https://httpbin.org/
...

colly有两个比较好用的扩展:

  • extensions.RandomUserAgent(c) 这个是在访问的时候,使用随机的UserAgent,来模拟不同的浏览器访问
  • extensions.Referrer(c) 这个是在访问的时候带上Referrer,意思就是这一次点击是从哪个页面产生的

colly 有几个常用的回调方法:

  • OnError 是指出错的时候的回调,回调函数签名为 type ErrorCallback func(*Response, error)
  • OnHTML 是指返回结果是 HTML 的时候的回调,回调函数签名为 type HTMLCallback func(*HTMLElement) , OnXML 和它类似
  • OnRequest 是在发起请求之前的回调
  • OnResponse 是在收到响应之后的回调
  • OnScraped 是指在抓取完成之后执行的回调,也就是在 OnHTML 之后。关于这些回调函数的执行顺序,我们会在源码分析这一节讲述

colly源码分析

分析colly的源码,我们主要就是想弄清楚colly到底是怎么实现的,我们从上面的demo可以看出来,首先我们执行了 c := colly.NewCollector(), 然后我们执行了 c.Visit() 开始访问。

我们从 NewCollector 入手,看看 Collector 是什么东西,使用代码跳转到 NewCollector 的定义所在,然后找到 Collector 的定义所在:

// Collector provides the scraper instance for a scraping job
type Collector struct {
	// UserAgent is the User-Agent string used by HTTP requests
	UserAgent string
	// MaxDepth limits the recursion depth of visited URLs.
	// Set it to 0 for infinite recursion (default).
	MaxDepth int
	// AllowedDomains is a domain whitelist.
	// Leave it blank to allow any domains to be visited
	AllowedDomains []string
	// DisallowedDomains is a domain blacklist.
	DisallowedDomains []string
	// DisallowedURLFilters is a list of regular expressions which restricts
	// visiting URLs. If any of the rules matches to a URL the
	// request will be stopped. DisallowedURLFilters will
	// be evaluated before URLFilters
	// Leave it blank to allow any URLs to be visited
	DisallowedURLFilters []*regexp.Regexp
	// URLFilters is a list of regular expressions which restricts
	// visiting URLs. If any of the rules matches to a URL the
	// request won't be stopped. DisallowedURLFilters will
	// be evaluated before URLFilters

	// Leave it blank to allow any URLs to be visited
	URLFilters []*regexp.Regexp

	// AllowURLRevisit allows multiple downloads of the same URL
	AllowURLRevisit bool
	// MaxBodySize is the limit of the retrieved response body in bytes.
	// 0 means unlimited.
	// The default value for MaxBodySize is 10MB (10 * 1024 * 1024 bytes).
	MaxBodySize int
	// CacheDir specifies a location where GET requests are cached as files.
	// When it's not defined, caching is disabled.
	CacheDir string
	// IgnoreRobotsTxt allows the Collector to ignore any restrictions set by
	// the target host's robots.txt file.  See http://www.robotstxt.org/ for more
	// information.
	IgnoreRobotsTxt bool
	// Async turns on asynchronous network communication. Use Collector.Wait() to
	// be sure all requests have been finished.
	Async bool
	// ParseHTTPErrorResponse allows parsing HTTP responses with non 2xx status codes.
	// By default, Colly parses only successful HTTP responses. Set ParseHTTPErrorResponse
	// to true to enable it.
	ParseHTTPErrorResponse bool
	// ID is the unique identifier of a collector
	ID uint32
	// DetectCharset can enable character encoding detection for non-utf8 response bodies
	// without explicit charset declaration. This feature uses https://github.com/saintfish/chardet
	DetectCharset bool
	// RedirectHandler allows control on how a redirect will be managed
	RedirectHandler func(req *http.Request, via []*http.Request) error
	// CheckHead performs a HEAD request before every GET to pre-validate the response
	CheckHead         bool
	store             storage.Storage
	debugger          debug.Debugger
	robotsMap         map[string]*robotstxt.RobotsData
	htmlCallbacks     []*htmlCallbackContainer
	xmlCallbacks      []*xmlCallbackContainer
	requestCallbacks  []RequestCallback
	responseCallbacks []ResponseCallback
	errorCallbacks    []ErrorCallback
	scrapedCallbacks  []ScrapedCallback
	requestCount      uint32
	responseCount     uint32
	backend           *httpBackend
	wg                *sync.WaitGroup
	lock              *sync.RWMutex
}

可以看出来,这就是colly中集中所有运行时候需要的东西的地方,注意,最下面有几个重要的东西:

  • store storage.Storage 这个是后端存储,即把东西爬下来了存哪里,具体使用可以参考文档
  • 一系列的 xxxCallbacks 这个就是我们设置的回调存储的地方

我们看下我们demo的执行,随便来一个,例如 c.OnHTML 那个:

// OnHTML registers a function. Function will be executed on every HTML
// element matched by the GoQuery Selector parameter.
// GoQuery Selector is a selector used by https://github.com/PuerkitoBio/goquery
func (c *Collector) OnHTML(goquerySelector string, f HTMLCallback) {
	c.lock.Lock()
	if c.htmlCallbacks == nil {
		c.htmlCallbacks = make([]*htmlCallbackContainer, 0, 4)
	}
	c.htmlCallbacks = append(c.htmlCallbacks, &htmlCallbackContainer{
		Selector: goquerySelector,
		Function: f,
	})
	c.lock.Unlock()
}

就可以看出来,其实回调函数就是存在 c.htmlCallbacks 里。

接下来我们看 c.Visit,它是整个流程的启动者:

// Visit starts Collector's collecting job by creating a
// request to the URL specified in parameter.
// Visit also calls the previously provided callbacks
func (c *Collector) Visit(URL string) error {
	if c.CheckHead {
		if check := c.scrape(URL, "HEAD", 1, nil, nil, nil, true); check != nil {
			return check
		}
	}
	return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

然后我们追踪到 c.scrape

func (c *Collector) scrape(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, checkRevisit bool) error {
	if err := c.requestCheck(u, method, depth, checkRevisit); err != nil {
		return err
	}
	parsedURL, err := url.Parse(u)
	if err != nil {
		return err
	}
	if parsedURL.Scheme == "" {
		parsedURL.Scheme = "http"
	}
	if !c.isDomainAllowed(parsedURL.Host) {
		return ErrForbiddenDomain
	}
	if method != "HEAD" && !c.IgnoreRobotsTxt {
		if err = c.checkRobots(parsedURL); err != nil {
			return err
		}
	}
	if hdr == nil {
		hdr = http.Header{"User-Agent": []string{c.UserAgent}}
	}
	rc, ok := requestData.(io.ReadCloser)
	if !ok && requestData != nil {
		rc = ioutil.NopCloser(requestData)
	}
	req := &http.Request{
		Method:     method,
		URL:        parsedURL,
		Proto:      "HTTP/1.1",
		ProtoMajor: 1,
		ProtoMinor: 1,
		Header:     hdr,
		Body:       rc,
		Host:       parsedURL.Host,
	}
	setRequestBody(req, requestData)
	u = parsedURL.String()
	c.wg.Add(1)
	if c.Async {
		go c.fetch(u, method, depth, requestData, ctx, hdr, req)
		return nil
	}
	return c.fetch(u, method, depth, requestData, ctx, hdr, req)
}

c.scrape 做的事情就是,首先执行 c.requestCheck 检查一下URL是不是ok呀,是不是访问过了呀之类的,然后解析一下URL, 判断域名是不是允许访问,然后组装好请求之后,传递到 c.fetch 里,我们继续跟:

func (c *Collector) fetch(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, req *http.Request) error {
	defer c.wg.Done()
	if ctx == nil {
		ctx = NewContext()
	}
	request := &Request{
		URL:       req.URL,
		Headers:   &req.Header,
		Ctx:       ctx,
		Depth:     depth,
		Method:    method,
		Body:      requestData,
		collector: c,
		ID:        atomic.AddUint32(&c.requestCount, 1),
	}

	c.handleOnRequest(request)

	if request.abort {
		return nil
	}

	if method == "POST" && req.Header.Get("Content-Type") == "" {
		req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
	}

	if req.Header.Get("Accept") == "" {
		req.Header.Set("Accept", "*/*")
	}

	origURL := req.URL
	response, err := c.backend.Cache(req, c.MaxBodySize, c.CacheDir)
	if proxyURL, ok := req.Context().Value(ProxyURLKey).(string); ok {
		request.ProxyURL = proxyURL
	}
	if err := c.handleOnError(response, err, request, ctx); err != nil {
		return err
	}
	if req.URL != origURL {
		request.URL = req.URL
		request.Headers = &req.Header
	}
	atomic.AddUint32(&c.responseCount, 1)
	response.Ctx = ctx
	response.Request = request

	err = response.fixCharset(c.DetectCharset, request.ResponseCharacterEncoding)
	if err != nil {
		return err
	}

	c.handleOnResponse(response)

	err = c.handleOnHTML(response)
	if err != nil {
		c.handleOnError(response, err, request, ctx)
	}

	err = c.handleOnXML(response)
	if err != nil {
		c.handleOnError(response, err, request, ctx)
	}

	c.handleOnScraped(response)

	return err
}

这里就涉及到我们上面说的回调的顺序了,可以从代码里看出来,依次是:

  • c.handleOnRequest(request)
  • if err := c.handleOnError(response, err, request, ctx); err != nil {
  • c.handleOnResponse(response)
  • err = c.handleOnHTML(response)
  • err = c.handleOnXML(response)
  • c.handleOnScraped(response)

duang,顺序一下就明了了。

至于store就由读者自己去看吧,其实很简单,实现这个接口就好了:

// Storage is an interface which handles Collector's internal data,
// like visited urls and cookies.
// The default Storage of the Collector is the InMemoryStorage.
// Collector's storage can be changed by calling Collector.SetStorage()
// function.
type Storage interface {
	// Init initializes the storage
	Init() error
	// Visited receives and stores a request ID that is visited by the Collector
	Visited(requestID uint64) error
	// IsVisited returns true if the request was visited before IsVisited
	// is called
	IsVisited(requestID uint64) (bool, error)
	// Cookies retrieves stored cookies for a given host
	Cookies(u *url.URL) string
	// SetCookies stores cookies for a given host
	SetCookies(u *url.URL, cookies string)
}

官方文档上有redis,sqlite3等等后端支持。


参考资料:


更多文章
  • Git HTTPS 如何保存密码
  • 程序员修炼之道 阅读笔记
  • Python开发实践经验
  • Golang实现平滑重启(优雅重启)
  • traefik 教程
  • Web开发系列(十):事务和锁
  • Web开发系列(十一):数据库扩展
  • Nginx作为TCP/UDP的负载均衡
  • Web开发简介系列
  • Web开发系列(九):消息队列,异步任务
  • Nginx 请求匹配规则
  • Web开发系列(六):关系型数据库,ORM
  • Web开发系列(七):缓存,CDN
  • Web开发系列(八):单点故障,负载均衡
  • Web开发系列(五):form, json, xml