Cassandra and EventMachine

I spent this past weekend adding eventmachine support for the Cassandra gem. We’re using Cassandra at OneSpot as our next-gen data store and need EM support. They were nice enough to pull my changes yesterday so the next release of the thrift_client and cassandra gems should work in EM. You just need to do this:

    require 'thrift_client/event_machine'
    EM.run do
      Fiber.new do
        @twitter = Cassandra.new('Twitter', "127.0.0.1:9160", :transport => Thrift::EventMachineTransport, :transport_wrapper => nil)
        @twitter.clear_keyspace!
        EM.stop
      end.resume
    end

The key is the :transport and :transport_wrapper options which override the default, Socket-based implementation. Like all of my EventMachine code, this requires Ruby 1.9.

6 thoughts on “Cassandra and EventMachine”

  1. Correct me if I am wrong, but looking at this, it seems to be a blocking implementation, that is using a separate thread to communicate with Cassandra.

    Are there any Cassandra Protocols for EventMachine that can run within the Reactor thread? Is that a silly thing to expect with Cassandra (I am new to Cassandra and Eventmachine)

  2. Blair, look at the Cassandra and thrift_client gems. Their APIs are blocking – there’s no way the gems could be used with the more traditional EM callback style. You can certainly have a callback-based Cassandra client but you would need to start from scratch and develop a new API.

  3. I have been investigating using Cassandra, Sinatra, EventMachine and Fibers for the purposes of building a highly scalable and distributed API. I have fully enjoyed and learned quite a bit from your blog and presentations but I am finding it difficult to develop and Sinatra/Eventd application that pools Cassandra connections. Can you provide a more detailed example that illustrates this?

  4. Hi Mike et al.,

    I’m relatively new to Cassandra and Eventmachine so I have to learn a lot ;-) My goal is really simple. I wanna read a large file (120 MB) line by line and insert every row into a Cassandra DB. Sounds simple but I’m trying it for 2 days now :-(

    #!/usr/bin/env ruby

    require ‘rubygems’
    require ‘cassandra’
    require ‘eventmachine’
    require ‘thrift_client/event_machine’

    EM.run do
    Fiber.new do
    rm = Cassandra.new(‘RankMetrics’, “127.0.0.1:9160″, :transport => Thrift::EventMachineTransport, :transport_wrapper => nil)
    rm.clear_keyspace!
    file = File.open(“us_100000.txt”)
    read_chunk = proc do
    10.times do
    if line = file.gets
    rm.insert(:Domains, “#{line.downcase}”, {‘domain’ => “#{line}”})
    else
    EM.stop
    end
    end
    EM.next_tick(read_chunk)
    end
    EM.next_tick(read_chunk)
    end.resume
    end

    AFAIK EventMachine doesnt support asynchronous fs read/write, right? So I tried it with the chunks of 10 lines per tick but the code seems to block. I’m at my wit’s end. Do you have a hint for me whats going wrong there?

    Cheerio,
    Chris

    1. This is not the right venue for detailed technical help. One core issue with your impl is that you are still just doing one line at a time (because you only create a single fiber). Why bother with EM + Fibers if you are still basically single threaded?

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>