An Introduction to Storm Crawler

Date: 2021-02-22 · Word Count: 2164 · Reading Time: 11 minutes

Introduction

I started trying to work with Storm Crawler for a project and found the documentation to be a little spartan. I had figured that I was working with a general purpose crawler, but that’s not the case.

Turns out, things were not quite so simple and, while Storm Crawler does work out of the box, it’s really an SDK for developing your own crawling system. The documentation is minimal, with most information being a series of blog posts on the Digital Pebble website labelled tutorial or storm-crawler.

So, this is the first in what’s probably a series of blog posts aiming to understand what it is and how it all works.

Storm Crawler Overview

Storm Crawler is an SDK based on Apache Storm for developing your own crawler. It’s heavily customisable and you can do some basic crawling out of the box. At the end of the day though, you’re going to want to use the framework to develop a customised crawler that meets your business needs.

If you’re not already familiar with Storm, it’s a framework for running large jobs spread among a number of compute nodes. You can also run it in local mode, for smaller tasks or development. I’m not going to get into the details of running a Storm cluster here, but there are some concepts to be familiar with in order to understand how Storm Crawler itself works.

Topology: An application. Storm topologies are long running applications which take input and process it.

Stream: A series of tuples passed between Spouts and Bolts

Spout: The source of a stream or streams. This typically reads from an external source and injects it into the topology. In Storm Crawler, built in Spouts include FileSpout (reads from a file) and MemorySpout (the list is provided through some in memory structure). In the External tree, you can find examples such as SQLSpout (read from a DB), SolrSpout (read from Apache Solr) and WARCSpout (read from a ISO 28500 Web Archive).

Bolt: A processing engine. Takes input from a stream or streams, processes it and outputs it to other stream(s). In Storm Crawler this includes actions such as FetcherBolt (fetch a page), JSoupParserBolt (parse the HTML of a page, NB This does NOT parse JavaScript), URLPartitionerBolt (organise how URLs will be divided into buckets for processing) and many more.

Stream Grouping: The definition of how streams should partition between bolt tasks

Task: An execution thread.

Worker: A Java Virtual Machine (JVM) instance.

Understanding a Storm Crawler Topology

Now that we have some idea of the parts involved, let’s take a look at one of the out of the box configurations and see how it all hangs together. For this example, I’m going to use the ElasticSearch configuration as that’s what I’ve been working with.

Firstly, the topology is defined in a new class, ESCrawlTopology, which extends ConfigurableTopology. Storm Crawler also has the ability to use Apache Flux to define the topology, which we’ll look at further down.

        builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
        Fields key = new Fields("url");
        builder.setBolt("filter", new URLFilterBolt())
                .fieldsGrouping("filespout", Constants.StatusStreamName, key);
        builder.setSpout("spout", new AggregationSpout(), numShards);
        builder.setBolt("status_metrics", new StatusMetricsBolt())
                .shuffleGrouping("spout");
        builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
                .shuffleGrouping("spout");
        builder.setBolt("fetch", new FetcherBolt(), numWorkers)
                .fieldsGrouping("partitioner", new Fields("key"));
        builder.setBolt("sitemap", new SiteMapParserBolt(), numWorkers)
                .localOrShuffleGrouping("fetch");
        builder.setBolt("parse", new JSoupParserBolt(), numWorkers)
                .localOrShuffleGrouping("sitemap");
        builder.setBolt("indexer", new IndexerBolt(), numWorkers)
                .localOrShuffleGrouping("parse");
        builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
                .fieldsGrouping("fetch", Constants.StatusStreamName, key)
                .fieldsGrouping("sitemap", Constants.StatusStreamName, key)
                .fieldsGrouping("parse", Constants.StatusStreamName, key)
                .fieldsGrouping("indexer", Constants.StatusStreamName, key)
                .customGrouping("filter", Constants.StatusStreamName,
                        new URLStreamGrouping());
        builder.setBolt("deleter", new DeletionBolt(), numWorkers)
                .localOrShuffleGrouping("status",
                        Constants.DELETION_STREAM_NAME);

As we can see above, the name of the Spout or Bolt is mentioned, along with its class and grouping. The grouping also mentions the source id for this bolt (e.g. shuffleGrouping(“spout”); sources from the spout component). As we may remember from the concepts document, this will link to the default stream for the “spout” componenet. We can also see non-default stream usage in the declaration of the deleter bolt, which declares that it wants the Constants.DELETION_STREAM_NAME stream from the status bolt. Similarly, the status bolt fetches the Constants.StatusStreamName stream from all the other bolts.

Instead of using a Java class to configure the topology, we can also use Flux, which is generally preferred as it’s dynamic and doesn’t require a recompilation of the master JAR, instead. Here’s the Flux configuration which ships with the Elasticsearch crawler: es-crawler.flux

The spouts and bolts sections setup our various classes as well as telling Storm how many tasks of each we will need (the parallelism).

spouts:
  - id: "spout"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout"
    parallelism: 10

  - id: "filespout"
    className: "com.digitalpebble.stormcrawler.spout.FileSpout"
    parallelism: 1
    constructorArgs:
      - "."
      - "seeds.txt"
      - true

bolts:
  - id: "filter"
    className: "com.digitalpebble.stormcrawler.bolt.URLFilterBolt"
    parallelism: 1
  - id: "partitioner"
    className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
    parallelism: 1
  - id: "fetcher"
    className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
    parallelism: 1
  - id: "sitemap"
    className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
    parallelism: 1
  - id: "parse"
    className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
    parallelism: 1
  - id: "index"
    className: "com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt"
    parallelism: 1
  - id: "status"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
    parallelism: 1
  - id: "status_metrics"
    className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
    parallelism: 1

streams defines how data will be passed around and is the key to understanding how this particular topology functions. Note that, unlike the ConfigurableTopology, DeletionBolt is not in the default Flux configuration.

streams:
  - from: "spout"
    to: "partitioner"
    grouping:
      type: SHUFFLE

  - from: "spout"
    to: "status_metrics"
    grouping:
      type: SHUFFLE

  - from: "partitioner"
    to: "fetcher"
    grouping:
      type: FIELDS
      args: ["key"]

  - from: "fetcher"
    to: "sitemap"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "sitemap"
    to: "parse"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "parse"
    to: "index"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "fetcher"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "sitemap"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "parse"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "index"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "filespout"
    to: "filter"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "filter"
    to: "status"
    grouping:
      streamId: "status"
      type: CUSTOM
      customClass:
        className: "com.digitalpebble.stormcrawler.util.URLStreamGrouping"
        constructorArgs:
          - "byDomain"

That’s a lot to try and take in, let’s see how that looks as a graph:

graph LR spout([AggregationSpout]) filespout([FileSpout]) filter[[URLFilterBolt]] partitioner[[URLPartitionerBolt]] fetcher[[FetcherBolt]] sitemap[[SiteMapParserBolt]] parse[[JSoupParserBolt]] index[[IndexerBolt]] status[[StatusUpdaterBolt]] status_metrics[[StatusMetricsBolt]] seeds[(seeds.txt)] seeds --> filespout filespout --> filter filter --> status spout --> partitioner spout --> status_metrics partitioner --> fetcher fetcher --> sitemap sitemap --> parse parse --> index fetcher --> status sitemap --> status parse --> status index --> status

A bit easier to see, looks like we’re missing something though. We know that ElasticSearch is involved here somewhere as a storage mechanism and that AggregationSpout has to be sourcing data from somewhere. So, let’s see what’s inside.

/**
 * Spout which pulls URL from an ES index. Use a single instance unless you use
 * 'es.status.routing' with the StatusUpdaterBolt, in which case you need to
 * have exactly the same number of spout instances as ES shards. Guarantees a
 * good mix of URLs by aggregating them by an arbitrary field e.g.
 * key.
 **/
@SuppressWarnings("serial")
public class AggregationSpout extends AbstractSpout implements
        ActionListener<SearchResponse> {

And inside StatusUpdaterBolt we see:

/**
 * Simple bolt which stores the status of URLs into ElasticSearch. Takes the
 * tuples coming from the 'status' stream. To be used in combination with a
 * Spout to read from the index.
 **/
@SuppressWarnings("serial")
public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt implements
        RemovalListener<String, List<Tuple>>, BulkProcessor.Listener {

Along with these, the Elasticsearch tree includes two other bolts:

  • DeletionBolt
    /**
     * Deletes documents to ElasticSearch. This should be connected to the
     * StatusUpdaterBolt via the 'deletion' stream and will remove the documents
     * with a status of ERROR one by one. Note that this component will also try to
     * delete documents even though they were never indexed and it currently won't
     * delete documents which were indexed under the canonical URL.
     */
    public class DeletionBolt extends BaseRichBolt {
  • IndexerBolt
    /**
     * Sends documents to ElasticSearch. Indexes all the fields from the tuples or a
     * Map &lt;String,Object&gt; from a named field.
     */
    @SuppressWarnings("serial")
    public class IndexerBolt extends AbstractIndexerBolt implements
            RemovalListener<String, List<Tuple>>, BulkProcessor.Listener {

Which finally fills in the complete picture. Note that the Flux configuration does not include the DeletionBolt, but I’ve included it as it’s in the ConfigurableTopology.

graph LR deleter[(DeletionBolt)] elasticsearch[(ElasticSearch)] fetcher[[FetcherBolt]] filespout([FileSpout]) filter[[URLFilterBolt]] index[[IndexerBolt]] parse[[JSoupParserBolt]] partitioner[[URLPartitionerBolt]] seeds[(seeds.txt)] sitemap[[SiteMapParserBolt]] spout([AggregationSpout]) status[[StatusUpdaterBolt]] status_metrics[[StatusMetricsBolt]] seeds --> filespout filespout --> filter filter --> status spout --> partitioner spout --> status_metrics partitioner --> fetcher fetcher --> sitemap sitemap --> parse parse --> index fetcher --> status sitemap --> status parse --> status index --> status elasticsearch --> spout status --url status--> elasticsearch status --> deleter index --documents--> elasticsearch deleter --delete documents --> elasticsearch

Bolts and Spouts

An overview of each of the Bolts and Spouts in the Storm Crawler distribution as of 2021-02-22. This is the nuts and bolts of Storm Crawler and it’s likely that you’ll need to write your own Bolts and/or Spouts to meet your business needs.

Core

Bolts:

  • FeedParserBolt
    • Extracts URLs from RSS feeds (NB: Does not support ATOM)
  • FetcherBolt
    • Fetches URLs
    • Multi-threaded and queue based
    • Can mark sitemaps
    • Adheres to robots.txt
    • Configurable agent name
  • JSoupParserBolt
    • Uses JSoup to parse HTML (NB: Does not parse JavaScript)
    • Updates robots rules with directives found in page metadata
    • Finds links referred to via a href= tags
    • Filters (and potentially normalises) the links through a URL filter before emitting to the stream (see urlfilters.config.file) example
    • Extracts text content via JSoup
    • Filters content through a parse filter before emitting to the stream (see parsefilters.config.file) example
  • SimpleFetcherBolt
    • Fetches URLs
    • Single thread, no internal queues
    • Adheres to robots.txt
    • Can mark sitemaps
    • Configurable agent name
  • SiteMapParserBolt
    • Extract URLs from a sitemap
    • Applies parse filters
  • StatusEmitterBolt
    • Utility functions for bolts which emit to the status stream
  • URLFilterBolt
    • Filters incoming URLs (e.g. can be used after FileSpout to filter URLs read from the file)
    • Uses the same urlfilters.config.file as JSoupParserBolt
  • URLPartitionerBolt
    • Partitions URLs by hostname, domain or IP
    • URLs in a given partition will always be handled by the same Storm task to improve the efficiency or robots.txt handling

Spouts:

  • FileSpout
    • Reads URLs from files filtereed from a directory specified in constructorArgs
    • An optional bool declares whether the URLs emitted will already be marked as DISCOVERED
  • MemorySpout
    • Provides a set of URLs from memory. These are passed in via the constructorArgs in crawler.flux

AWS

Includes an indexer for CloudSearch and modules for using S3 as storage

Bolts:

Elasticsearch

Tools for using Elasticsearch as the index and document cache

Bolts:

  • DeletionBolt
    • Deletes documents from Elasticsearch
    • If connected to StatusUpdaterBolt, will delete documents with a status of ERROR
    • Will NOT delete documents which were not indexed under their canonical URL
  • IndexerBolt
    • Sends documents to Elasticsearch and indexes the fields
  • StatusUpdaterBolt
    • Store the status of URLs in Elasticsearch

Spouts:

  • AggregationSpout
    • Pulls URLs from Elasticsearch
    • Aggregates by bucket key to mix the URLs
    • Single instance unless using *es.status.routing' with StatusUpdaterBolt
      • Requires spout instances and Elasticsearch shards to be identical
  • CollapsingSpout
    • Collapses results using Elasticsearch field collapsing so that returned results are from a diverse set of buckets.
    • Single instance unless using *es.status.routing' with StatusUpdaterBolt
      • Requires spout instances and Elasticsearch shards to be identical
  • HybridSpout
    • Extends AggregationSpout to query for all the URLs for a given host
    • Single instance unless using *es.status.routing' with StatusUpdaterBolt
      • Requires spout instances and Elasticsearch shards to be identical
  • ScrollSpout
    • Copies all the documents from a shard to the status stream
    • Used to copy an index

Solr

Tools for using Solr as the index and document cache

Bolts:

Spouts:

  • SolrSpout
    • Retrieves documents from Solr and injects them into the topology
    • Can only have a single instance

SQL

Use an SQL database for the index and document cache

Bolts:

Spouts:

  • SQLSpout
    • Retrieves documents from a database and injects them into the topology
    • Can have multiple instances, each in charge of a separate bucket

Tika

Use Apache Tika for parsing documents. This can be used as a replacment JSoupParser or alongside by having JSoup handle the HTML and Tika handle the other document types (such as PDF).

Bolts:

  • ParserBolt
    • Collect text and metadata from a document using Tika
    • Skip trimmed documents
  • RedirectionBolt
    • Redirects documents to ParserBolt if the MIME type is not supported by JSoupParserBolt

WARC

Generating and consuming ISO 28500 Web Archives.

Bolts:

  • GzipHdfsBolt
    • Writes a gzipped stream with per record compression to HDFS
  • WARCHdfsBolt
    • Extends GzipHdfsBolt to write a compressed web archive to HDFS

Spouts:

  • WARCSpout
    • Read WARC files from a local filesystem or HDFS and inject them into the topology

Conclusion

Obviously, there’s a lot more to understand here, including things like configuration, text parsing and how to hand off requests to WebDriver. Those will come in other posts.