Long time, no blog...
Well, it's not really a new blog post but a more detailed description of the demo I've shown at the JAX2011 conference in May. It's about big data - here is the preso.
After having explained some theoretical stuff I have briefly introduced some technologies as well as corresponding open source products using a couple of typical questions/use cases one could have dealing with big data and searching for a way to tame it.
I had a demo setup with me to show which use case can be solved how using which technology. The demo itself is extremely overdone, but should show how a very complex, hypothetic scenario could be implemented using appropriate technologies. Please find the whole demo code here.
"jhtsrv" contains the very basic implementation of the front end web server as well as the Esper and Riak abstractions for the on-going data collection. "repl" contains an own Hadoop map-reduce job as well as the corresponding Swift and Riak abstractions. "stat" contains an R-script for plotting/statistics. "soapui" contains a SoapUI project to stress the web server.
Now enough intro. What was the goal? Imagine an attempt to do a global, world-wide voting. Let's say, we vote for: "should we escape to Mars real fast?". We want the web-based voting platform to be very fast, so we need minimal latency for the very and only operation this platform provides: based on an IP address of the user, to store a YES or a NO. Ok, this is real stupid since big organizations might be filed through one single proxy IP and we can't distinguish single people behind it, but hey - it's a hypothetic demo, ok? So we ignore this issue.
We want to alert a responsible person in a region when within some time a configured number of negative votes appears. Alerts must happen as data comes in, not later.
Regardless of whether the vote is negative or positive, we need to file it as reliably as possible, but we would accept some faults as long as users can vote almost without latency using their smart phones, browsers etc.
Once a day, we want to analyze the votes statistically in our headquarters. For this, it's sufficient to have a snapshot of what happened till then. But we need the data to be available for analysis real fast, so also no considerable latency can be accepted for data preparation.
What we do after statistical analysis is to create a world map with green points for positive and red points for negative votes. We translate IP adresses into corresponding geographic locations for this.
Ok, yes, it would be a useless graphical mess with colors overlapping each other. But as I already wrote - it's an experiment, a demo.
Now. How would we build this? First of all: we need to devide this global platform into regional data centers responsible for a piece of the whole work. Will be to expensive? Well, with a global question like this money wouldn't play a big role anymore. But we definitely need geographic proximity of users to their point of voting in order to have minimal latency. Thus, we need to go to the internet edges like CDNs do.
The shorter the network distance and the less infrastructure stuff on the way, the less latency: less fault handling, lost packets, timeouts, waiting etc.
The more independent servers not sharing a bit of data or a critical computing resource, the less latency: no load on an overloaded centralized infrastructure, scalability directly at the connection point etc.
The other reason why we would go for a regional data center is to crunch this mega huge rolling stone of data into smaller pieces. We would store smaller data sets locally in the regional data centers and find a way to fit them together later.
And generally: the less danger of global disaster such as the central data center being unavailable for all users, the... Well, this explains itself.
Ok, the tiny web server I use in the demo is a joke. But I hope the reader can abstract and imagine a whole group of web servers. There is no state to be managed, no dynamic content or such. Requests can be balanced between several web servers easily, because the technologies we use behind would be no bottle necks or single points of failure - we can scale them out as well. It's a distributed data store (call it a NoSQL database if you like, but I won't) and a stream processor which can be centralized or decentralized, cache or database backend etc. - depending on the use case.
So far, so good. Now how would we write votes in the regional data centers almost reliably but immediately and extremely fast while checking the same data for negative votes within some period of time?
First, we need a distributed (cluster) data store to store our data on several nodes redundantly. Second, this data store must provide sloppy write quorum, and we wouldn't want to wait for the confirmation of a durable write on any of the cluster nodes. So yes, we take Riak as a real cool Dynamo's implementation.
But before we throw the data on Riak, we asynchronously push it into the event stream of Esper. We do some CEP there in order to alert on N negative votes within X minutes. Ok, we are a little bit inaccurate - we don't consider the global votes in the threshold calculation, but only in one data center. So, many alerts wouldn't get fired when in the past 30 seconds in Germany and in the USA we had 120 negative votes. But it's the price we pay for decentralization, and it must be an acceptable one. And more than that: whom to alert when we pass a threshold in two countries? It's getting more and more unnecessarily complex - let's stick with data center borders.
So, Esper would check the data as it comes in - ideally without any storage, right from the memory or a distributed memory cache when we have a scenario with several web servers in a regional data centers, which is very likely. Esper has an HA option/package, which I didn't consider for the demo though and which is able to persist streams and to distribute them so different engines on different nodes can pull events out kind of coordinated. One can still consider a queue for this. But let's move on assuming that Esper does what it does - CEP, maybe on several machines. On the fly.
Now back to Riak. The sloppy write quorum with 3 nodes storing each vote could be 1. We only need a confirmation from one node, the rest Riak should do gossip behind the scenes, while we aren't waiting anymore. And this node does only need to say: "I have the record". Not "I have stored the record". Such setup could be weak since some records could really get lost, so we could consider quorum of 2. Or we could think: 1 is ok, when the node crashes, another one would take over the data, so what we lose is minimal. When we want that 6 billion people vote real quick, a couple of votes can really get lost without changing the whole result.
Now we have the data locally and we have alerts. What about the daily statistics? First of all, we need to get the data somehow from the regions to the headquarters. We need a sort of data warehouse there, right? But how do you collect the data from thousands of small data centers all over the world, while it's still getting written into the stores? How can you move the data fast enough to one data center?
But why one data center? The requirement was to access the data from the headquarters, but not to have it in its data center. So what would we do? We push the votes from data centers to a cloud based data store on the daily basis. We can access the cloud data store from everywhere, including data centers and headquarters, so.
So, we have a Riak store in the data center and need to move its current data to the cloud while data still comes in. For this, we would use the fact that we have a distributed data store. But the real weak point in the whole setup is that we are reading data while it's getting written, and then we write (delete) it while it's still getting written. A very careful testing and setup is necessary for this to work properly, if it really works at all.
First step is to get the snapshot - a "frozen" collection of keys to work with. Everything that came in after we pull the snapshot will be outside the cloud replication/load run.
Having the snapshot, we map-reduce the replication job. At least we do map, since there is not much to do in the reduce phase, and we use Hadoop as the framework for that. It can use a cluster of nodes to devide such a big job into smaller subtasks running on a data split, so we need to do a custom split as well as custom read format which can be seen in the demo code.
Running on several nodes, our mapper aggregates votes into groups of votes to prepare larger objects for the cloud store (it doesn't pay out to store small objects like single votes in the cloud, and we would never need to access them separately). We read a stored vote from Riak and delete it afterwards, but also with a sloppy delete quorum, so we don't really have to wait for the whole store to be in a consistent state conserning this one vote. Again, we are dealing with so many records and have quite fuzzy statistical accuracy - we are allowed to be a little inconsistent and to miss records or to have duplicates.
The mapper reads on one hand and writes on the other - directly to the cloud using its REST interface. To simulate cloud, we use Swift (OpenStack Object Storage). Its implementation is similar to Dynamo, and I guess it would also be able to provide eventual consistency and sloppy quorum when storing objects (though I didn't find a way to configure this from the client. Maybe it's done on the container basis, I didn't look further). We need such a store as cloud based store in order to scale as expected. Though, if we would lose aggregated vote groups pushing them sloppily into cloud, this would hurt much more than with single votes - here, we need to be much more careful.
Now, the data is in the cloud once a day. What's next? Now we take R and do some statistical stuff to analyze it. For example, in the demo I just have an "algorithm" to calculate big city coordinates from the user's IP address instead of doing some IP/geo location. Why? Because I did the demo in a conference and didn't know if the network connection would work. Normally, we would take the IP address, find out where it belongs and try to plot this point on the world map.
That's it. A simple demo. Sure, it was possible to solve it much easier. Sure, it's hypothetic. Sure, it would cost to much to build it all in the real world. And of course, some of my assumptions and simplifications are completely or partially wrong. But the whole thing also runs on one notebook with Ubuntu, so why not give it a try and think further/golbally? I hope I could show some use cases for some cool technologies and a way of thinking in big data scenarios. Well, to some point, of course.
I would appreciate any pointers/comments to/on where the thoughts or the implementation don't fit the common sence or the science/experience.
Thank you for reading.
No comments:
Post a Comment