Let’s use Ruby 1.9.2 and Psych to build an event based twitter stream parser. Psych is a YAML parser that I wrote and is in the standard library in 1.9.2. Eventually, it will replace the current YAML parser, but we can still use it today!
But you said YAML and JSON! wtf?
I know! In the YAML 1.2 spec, JSON is a subset of YAML. Psych supports YAML 1.1 right now, so much (but not all) JSON is supported. Once libyaml is upgraded to YAML 1.2, it will have full JSON support!
Why do we want to do an event based parser?
Twitter streams are a never ending flow of user status updates, and if we want a process to live forever consuming these updates, it would be nice if that process kept a low memory profile. Psych is built in such a way that we can hand it an IO object, it will read from the IO object, then call callback methods as soon as possible. It buffers as little as possible, sending events as soon as possible. If you are familiar with SAX based XML parsing, this will be familiar to you. Plus it is a fun problem!
Let’s start by writing an event listener for some sample JSON.
Event Listener
Our event listener is only going to listen for scalar events, meaning that when Psych parses a string, it will send that string to our listener. There are many different events that can happen, so Psych ships with a handler from which you can inherit. If you check out the source for the base class handler, you can see what types of events your handler can intercept.
For now, let’s write our scalar handler, and try it out.
require 'psych'
class Listener < Psych::Handler
def scalar(value, anchor, tag, plain, quoted, style)
puts value
end
end
listener = Listener.new
parser = Psych::Parser.new listener
parser.parse DATA
__END__
{"foo":"bar"}
If you run this code, you should see the strings “foo” and “bar” printed.
In this example, our handler simply prints out every scalar value encountered. We created a new instance of the listener, pass that listener to a new instance of the parser, and tell the parser to parse DATA. We can hand the parser an IO object or a String object. This is important because we’d like to hand the parser our socket connection, that way the parser can deal with reading from the socket for us.
Hooking up to Twitter
It would be convenient for us if Twitter’s stream was one continuous JSON document. Why? If it was, we could feed the socket straight to our JSON parser and start consuming events immediately. Unfortunately, Twitter’s stream is not so kind for us event based consumers. We’ll need to trick our JSON parser to think the feed is one continuous document. We’ll get tricky with our data in a minute, but first let’s deal with authentication.
Authentication
Twitter requires us to authenticate before we can consume a feed. Stream authentication is done via Basic Auth. Let’s write a class that can authenticate and read from the stream. Once we do that, we’ll concentrate on parsing the stream.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
require 'socket'
class StreamClient
def initialize user, pass
@ba = ["#{user}:#{pass}"].pack('m').chomp
end
def listen
socket = TCPSocket.new 'stream.twitter.com', 80
socket.write "GET /1/statuses/sample.json HTTP/1.1\r\n"
socket.write "Host: stream.twitter.com\r\n"
socket.write "Authorization: Basic #{@ba}\r\n"
socket.write "\r\n"
while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end
while line = socket.readline
puts line
end
end
end
StreamClient.new(ARGV[0], ARGV[1]).listen
|
This class takes a username and password and calculates the basic auth signature. When “listen” is called, it opens a connection, authorizes, reads the response headers, and starts consuming the feed.
Processing the Feed
If we look at the output from the previous script, we’ll see that the Twitter stream looks something like this:
512
{"in_reply_to_screen_name":null,...}
419
{"in_reply_to_screen_name":"tenderlove"...}
Which isn’t valid JSON. Instead, it’s a header (the number) indicating the length of the JSON chunk, the JSON chunk, then a trailing “\r\n”. We would like the stream to look something like this:
---
{"in_reply_to_screen_name":null,...}
...
---
{"in_reply_to_screen_name":"tenderlove"...}
...
This chunk is two valid YAML documents. If the stream looked like this, we could feed it straight to our YAML processor no problem. How can we modify the stream to be suitable for our parser?
Fun with Thread and IO.pipe
If we create a pipe, we can have have one thread process input from Twitter and feed that in to the pipe. We can then give the other end of the pipe to our JSON processor and let it read from our processed feed. Let’s modify the “listen” method in our client to munge the feed to a pipe, and hand that off to our YAML processor. I only care about the text of people’s tweets, so let’s modify our listener too.
Here is our completed program:
require 'socket'
require 'psych'
class StreamClient
def initialize user, pass
@ba = ["#{user}:#{pass}"].pack('m').chomp
end
def listen listener
socket = TCPSocket.new 'stream.twitter.com', 80
socket.write "GET /1/statuses/sample.json HTTP/1.1\r\n"
socket.write "Host: stream.twitter.com\r\n"
socket.write "Authorization: Basic #{@ba}\r\n"
socket.write "\r\n"
# Read the headers
while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end
reader, writer = IO.pipe
producer = Thread.new(socket, writer) do |s, io|
loop do
io.write "---\n"
io.write s.read s.readline.strip.to_i 16
io.write "...\n"
s.read 2 # strip the blank line
end
end
parser = Psych::Parser.new listener
parser.parse reader
producer.join
end
end
class Listener < Psych::Handler
def initialize
@was_text = false
end
def scalar value, anchor, tag, plain, quoted, style
puts value if @was_text
@was_text = value == 'text'
end
end
StreamClient.new(ARGV[0], ARGV[1]).listen Listener.new
Great! In 30 lines, we’ve been able to provide an event based API for consuming Twitter streams. Were it not for the feed munging, we could reduce that by 9 lines!
Problems
So far, there have only been two problems for me with this script. The first is that we are forced to buffer the response from Twitter, but we cannot help that. The second is that sometimes the JSON emitted from Twitter is not parseable by Psych. I think this is just due to Psych only supporting YAML 1.1.
Conclusion
It’s true that we could have implemented this same interface without a pipe and a thread. Rather than munging the stream, we could create a new parser instance for each status update. But why create so many objects for parsing the stream when we only need one?
Anyway, have fun playing with this code, and I encourage you to try out Ruby 1.9.2. I think it’s really fun! PEW PEW PEW! HAPPY SATURDAY!