One of the projects I'm working on involves scraping information from Amazon product listings. Currently it uses delayed job, but there's an issue with long-running processes dying off.
To deal with this we are considering a switch to Amazon's Simple Workflow Service. It moves the task queue to Amazon's servers and client programs then poll for tasks.
For the purpose of this article I'm going to do a simplified version of the application that just pulls down prices. The data structures for that are pretty straightforward:
rails new swf_scraperrails generate scaffold Product asin:stringrails generate scaffold Record price:float product_id:integerConnect the records to the products by adding has_many and belongs_to to the product and record models respectively.
Accessing the workflow service requires adding aws-sdk and aws-flow gems to the Gemfile.
SWF uses workflows to define the order of activity execution. The workflow for this project is:
class ScrapeWorkflow
extend AWS::Flow::Workflows
workflow :queue_scrape do
{
:version => "1.1",
:task_list => SWF_WORKFLOW_TASK_LIST,
:execution_start_to_close_timeout => 10 * 60,
}
end
activity_client(:activity){ {:from_class => "ScrapeActivity"} }
def queue_scrape(asin)
scrape_future = Future.new.set
scrape_future = activity.send_async(:scrape, asin)
# wait_for_all(scrape_future)
end
end
There is just one task in this workflow: scrape the asin. The activity is where the actual processing takes place:
class ScrapeActivity
extend AWS::Flow::Activities
activity :scrape do
{
:version => "1.1",
:default_task_list => SWF_ACTIVITY_TASK_LIST,
:default_task_schedule_to_start_timeout => 10 * 60,
:default_task_start_to_close_timeout => 30,
}
end
def initialize
@count = 0
end
def scrape(asin)
begin
@count += 1
url = "http://www.amazon.com/dp/" + asin
response = HTTParty.get(URI.encode(url))
doc = Nokogiri::HTML(response)
price_div = doc.at_css('.priceLarge')
price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
unless price
price_div = doc.at_css('.a-color-price.a-size-large')
price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
end
if price
product = Product.find_by_asin(asin)
product.records.create( price: price )
end
puts "#{@count} Scraped: #{asin}: #{price}"
rescue => e
puts "Error: #{e.message}"
end
end
end
The last piece of the puzzle is actually queuing the jobs and running the workflow and activity. This is accomplished with a rake task:
require "#{Rails.root}/app/helpers/application_helper"
include ApplicationHelper
require "#{ENV['GEM_HOME']}/gems/aws-flow-1.0.0/lib/aws/decider.rb"
require "#{Rails.root}/config/initializers/swf.rb"
require "#{Rails.root}/lib/scrape_activity.rb"
require "#{Rails.root}/lib/scrape_workflow.rb"
namespace :swf do
desc 'Start activity worker'
task :activity => :environment do
swf, domain = swf_domain
activity_worker = AWS::Flow::ActivityWorker.new(swf.client, domain, SWF_ACTIVITY_TASK_LIST, ScrapeActivity) { {:use_forking => false} }
activity_worker.start
end
desc 'Start workflow worker'
task :workflow => :environment do
swf, domain = swf_domain
worker = AWS::Flow::WorkflowWorker.new(swf.client, domain, SWF_WORKFLOW_TASK_LIST, ScrapeWorkflow)
worker.start
end
desc 'Queue activities'
task :scrape => :environment do
swf, domain = swf_domain
my_workflow_client = AWS::Flow::workflow_client(swf.client, domain) { {:from_class => "ScrapeWorkflow"} }
Product.all.each do |product|
$workflow_execution = my_workflow_client.start_execution(product.asin)
end
end
end
Setting up the client and domain is done by a helper method:
module ApplicationHelper
def swf_domain
@swf = AWS::SimpleWorkflow.new
begin
@domain = @swf.domains.create(SWF_DOMAIN, "10")
rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e
@domain = @swf.domains[SWF_DOMAIN]
end
return @swf, @domain
end
end