Event based JSON and YAML parsing
Apr 17, 2010 @ 4:41 pmLet’s use Ruby 1.9.2 and Psych to build an event based twitter stream parser. Psych is a YAML parser that I wrote and is in the standard library in 1.9.2. Eventually, it will replace the current YAML parser, but we can still use it today!
But you said YAML and JSON! wtf?
I know! In the YAML 1.2 spec, JSON is a subset of YAML. Psych supports YAML 1.1 right now, so much (but not all) JSON is supported. Once libyaml is upgraded to YAML 1.2, it will have full JSON support!
Let’s start by writing an event listener for some sample JSON.
For now, let’s write our scalar handler, and try it out.
require 'psych'
class Listener < Psych::Handler
def scalar(value, anchor, tag, plain, quoted, style)
puts value
end
end
listener = Listener.new
parser = Psych::Parser.new listener
parser.parse DATA
__END__
{"foo":"bar"}
If you run this code, you should see the strings “foo” and “bar” printed.
In this example, our handler simply prints out every scalar value encountered. We created a new instance of the listener, pass that listener to a new instance of the parser, and tell the parser to parse DATA. We can hand the parser an IO object or a String object. This is important because we’d like to hand the parser our socket connection, that way the parser can deal with reading from the socket for us.
Authentication
Twitter requires us to authenticate before we can consume a feed. Stream authentication is done via Basic Auth. Let’s write a class that can authenticate and read from the stream. Once we do that, we’ll concentrate on parsing the stream.
require 'socket'
class StreamClient
def initialize user, pass
@ba = ["#{user}:#{pass}"].pack('m').chomp
end
def listen
socket = TCPSocket.new 'stream.twitter.com', 80
socket.write "GET /1/statuses/sample.json HTTP/1.1\r\n"
socket.write "Host: stream.twitter.com\r\n"
socket.write "Authorization: Basic #{@ba}\r\n"
socket.write "\r\n"
# Read the headers
while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end
# Consume the feed
while line = socket.readline
puts line
end
end
end
StreamClient.new(ARGV[0], ARGV[1]).listen
This class takes a username and password and calculates the basic auth signature. When “listen” is called, it opens a connection, authorizes, reads the response headers, and starts consuming the feed.
Which isn’t valid JSON. Instead, it’s a header (the number) indicating the length of the JSON chunk, the JSON chunk, then a trailing “\r\n”. We would like the stream to look something like this:
This chunk is two valid YAML documents. If the stream looked like this, we could feed it straight to our YAML processor no problem. How can we modify the stream to be suitable for our parser?
Here is our completed program:
require 'socket'
require 'psych'
class StreamClient
def initialize user, pass
@ba = ["#{user}:#{pass}"].pack('m').chomp
end
def listen listener
socket = TCPSocket.new 'stream.twitter.com', 80
socket.write "GET /1/statuses/sample.json HTTP/1.1\r\n"
socket.write "Host: stream.twitter.com\r\n"
socket.write "Authorization: Basic #{@ba}\r\n"
socket.write "\r\n"
# Read the headers
while((line = socket.readline) != "\r\n"); puts line if $DEBUG; end
reader, writer = IO.pipe
producer = Thread.new(socket, writer) do |s, io|
loop do
io.write "---\n"
io.write s.read s.readline.strip.to_i 16
io.write "...\n"
s.read 2 # strip the blank line
end
end
parser = Psych::Parser.new listener
parser.parse reader
producer.join
end
end
class Listener < Psych::Handler
def initialize
@was_text = false
end
def scalar value, anchor, tag, plain, quoted, style
puts value if @was_text
@was_text = value == 'text'
end
end
StreamClient.new(ARGV[0], ARGV[1]).listen Listener.new
Great! In 30 lines, we’ve been able to provide an event based API for consuming Twitter streams. Were it not for the feed munging, we could reduce that by 9 lines!
Anyway, have fun playing with this code, and I encourage you to try out Ruby 1.9.2. I think it’s really fun! PEW PEW PEW! HAPPY SATURDAY!