Tenderlove Making

Event based JSON and YAML parsing

Let’s use Ruby 1.9.2 and Psych to build an event based twitter stream parser. Psych is a YAML parser that I wrote and is in the standard library in 1.9.2. Eventually, it will replace the current YAML parser, but we can still use it today!

But you said YAML and JSON! wtf?

I know! In the YAML 1.2 spec, JSON is a subset of YAML. Psych supports YAML 1.1 right now, so much (but not all) JSON is supported. Once libyaml is upgraded to YAML 1.2, it will have full JSON support!

Why do we want to do an event based parser?

Twitter streams are a never ending flow of user status updates, and if we want a process to live forever consuming these updates, it would be nice if that process kept a low memory profile. Psych is built in such a way that we can hand it an IO object, it will read from the IO object, then call callback methods as soon as possible. It buffers as little as possible, sending events as soon as possible. If you are familiar with SAX based XML parsing, this will be familiar to you. Plus it is a fun problem!

Let’s start by writing an event listener for some sample JSON.

Event Listener

Our event listener is only going to listen for scalar events, meaning that when Psych parses a string, it will send that string to our listener. There are many different events that can happen, so Psych ships with a handler from which you can inherit. If you check out the source for the base class handler, you can see what types of events your handler can intercept.

For now, let’s write our scalar handler, and try it out. ~~~ ruby require ‘psych’

class Listener < Psych::Handler def scalar(value, anchor, tag, plain, quoted, style) puts value end end

listener = Listener.new parser = Psych::Parser.new listener parser.parse DATA

END {“foo”:”bar”} ~~~

If you run this code, you should see the strings “foo” and “bar” printed.

In this example, our handler simply prints out every scalar value encountered. We created a new instance of the listener, pass that listener to a new instance of the parser, and tell the parser to parse DATA. We can hand the parser an IO object or a String object. This is important because we’d like to hand the parser our socket connection, that way the parser can deal with reading from the socket for us.

Hooking up to Twitter

It would be convenient for us if Twitter’s stream was one continuous JSON document. Why? If it was, we could feed the socket straight to our JSON parser and start consuming events immediately. Unfortunately, Twitter’s stream is not so kind for us event based consumers. We’ll need to trick our JSON parser to think the feed is one continuous document. We’ll get tricky with our data in a minute, but first let’s deal with authentication.

Authentication

Twitter requires us to authenticate before we can consume a feed. Stream authentication is done via Basic Auth. Let’s write a class that can authenticate and read from the stream. Once we do that, we’ll concentrate on parsing the stream.

require 'socket'

class StreamClient
  def initialize user, pass
    @ba = ["#{user}:#{pass}"].pack('m').chomp
  end

  def listen
    socket = TCPSocket.new 'stream.twitter.com', 80
    socket.write "GET /1/statuses/sample.json HTTP/1.1\r\n"
    socket.write "Host: stream.twitter.com\r\n"
    socket.write "Authorization: Basic #{@ba}\r\n"
    socket.write "\r\n"

    # Read the headers
    while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end

    # Consume the feed
    while line = socket.readline
      puts line
    end
  end
end

StreamClient.new(ARGV[0], ARGV[1]).listen

This class takes a username and password and calculates the basic auth signature. When “listen” is called, it opens a connection, authorizes, reads the response headers, and starts consuming the feed.

Processing the Feed

If we look at the output from the previous script, we’ll see that the Twitter stream looks something like this:

512
{"in_reply_to_screen_name":null,...}

419
{"in_reply_to_screen_name":"tenderlove"...}

Which isn’t valid JSON. Instead, it’s a header (the number) indicating the length of the JSON chunk, the JSON chunk, then a trailing “\r\n”. We would like the stream to look something like this:

---
{"in_reply_to_screen_name":null,...}
...
---
{"in_reply_to_screen_name":"tenderlove"...}
...

This chunk is two valid YAML documents. If the stream looked like this, we could feed it straight to our YAML processor no problem. How can we modify the stream to be suitable for our parser?

Fun with Thread and IO.pipe

If we create a pipe, we can have have one thread process input from Twitter and feed that in to the pipe. We can then give the other end of the pipe to our JSON processor and let it read from our processed feed. Let’s modify the “listen” method in our client to munge the feed to a pipe, and hand that off to our YAML processor. I only care about the text of people’s tweets, so let’s modify our listener too.

Here is our completed program: ~~~ ruby require ‘socket’ require ‘psych’

class StreamClient def initialize user, pass @ba = [”#{user}:#{pass}”].pack(‘m’).chomp end

def listen listener socket = TCPSocket.new ‘stream.twitter.com’, 80 socket.write “GET /1/statuses/sample.json HTTP/1.1\r\n” socket.write “Host: stream.twitter.com\r\n” socket.write “Authorization: Basic #{@ba}\r\n” socket.write “\r\n”

# Read the headers
while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end

reader, writer = IO.pipe
producer = Thread.new(socket, writer) do |s, io|
  loop do
    io.write "---\n"
    io.write s.read s.readline.strip.to_i 16
    io.write "...\n"
    s.read 2 # strip the blank line
  end
end

parser = Psych::Parser.new listener
parser.parse reader

producer.join   end end

class Listener < Psych::Handler def initialize @was_text = false end

def scalar value, anchor, tag, plain, quoted, style puts value if @was_text @was_text = value == ‘text’ end end

StreamClient.new(ARGV[0], ARGV[1]).listen Listener.new ~~~

Great! In 30 lines, we’ve been able to provide an event based API for consuming Twitter streams. Were it not for the feed munging, we could reduce that by 9 lines!

Problems

So far, there have only been two problems for me with this script. The first is that we are forced to buffer the response from Twitter, but we cannot help that. The second is that sometimes the JSON emitted from Twitter is not parseable by Psych. I think this is just due to Psych only supporting YAML 1.1.

Conclusion

It’s true that we could have implemented this same interface without a pipe and a thread. Rather than munging the stream, we could create a new parser instance for each status update. But why create so many objects for parsing the stream when we only need one?

Anyway, have fun playing with this code, and I encourage you to try out Ruby 1.9.2. I think it’s really fun! PEW PEW PEW! HAPPY SATURDAY!

« go back