Aside

storm_2

With Apache Hadoop YARN as its architectural center, Apache Hadoop continues to attract new engines to run within the data platform, as organizations want to efficiently store their data in a single repository and interact with it for batch, interactive and real-time streaming use cases. Apache Storm brings real-time data processing capabilities to help capture new business opportunities by powering low-latency dashboards, security alerts, and operational enhancements integrated with other applications running in the Hadoop cluster.

The community recently announced the release of Apache Storm 0.9.3. With this release, the team closed 100 JIRA tickets and delivered many new features, fixes and enhancements, including these three important improvements:

Screen Shot 2014-12-15 at 3.28.34 PM

This blog gives a brief overview of these new features in Apache Storm 0.9.3 and also looks ahead to future plans for the project.

HDFS Integration

Apache Storm’s HDFS integration consists of several bolt and Trident state implementations that allow topology developers to easily write data to HDFS from any Storm topology. Many stream processing use cases involve storing data in HDFS for further batch processing and further analysis of historical trends.

HBase Integration

Apache Storm’s HBase integration includes a number of components that allow Storm topologies to both write to and query HBase in real-time.

Many organizations use Apache HBase as part of their big data strategy for batch, interactive, and real-time workflows. Storm’s HBase integration allows users to leverage HBase data assets for streaming queries, and also use HBase as a destination for streaming computation results.

Output to Apache Kafka

Apache Storm has supported Kafka as a streaming data source since version 0.9.2-incubating. Now Storm 0.9.3 brings a number of improvements to the Kafka integration and also adds the ability to write data to one or more Kafka clusters and topics.

The ability to both read and write to Kafka unlocks additional potential in the already powerful combination of Storm and Kafka. Storm users can now use Kafka as a source of and destination for streaming data. This allows for inter-topology communication, combining spout and bolt-based topologies with Trident-based data flows. It also enables integration with any external system that supports data ingest from Kafka.

Plans for the Future

In upcoming releases of Apache Storm, the community will be focusing on enhanced security, high availability, and deeper integration with YARN.

The Apache Storm PMC would like to thank the community of volunteers who made the many new features and fixes in this release a reality.

Download Apache Storm and Learn More

Advertisements
Aside

The key to getting the most out of Spark is to understand the differences between its RDD API and the original Mapper and Reducer API.

Venerable MapReduce has been Apache Hadoop‘s work-horse computation paradigm since its inception. It is ideal for the kinds of work for which Hadoop was originally designed: large-scale log processing, and batch-oriented ETL (extract-transform-load) operations.

As Hadoop’s usage has broadened, it has become clear that MapReduce is not the best framework for all computations. Hadoop has made room for alternative architectures by extracting resource management into its own first-class component, YARN. And so, projects like Impala have been able to use new, specialized non-MapReduce architectures to add interactive SQL capability to the platform, for example.

Today, Apache Spark is another such alternative, and is said by many to succeed MapReduce as Hadoop’s general-purpose computation paradigm. But if MapReduce has been so useful, how can it suddenly be replaced? After all, there is still plenty of ETL-like work to be done on Hadoop, even if the platform now has other real-time capabilities as well.

Thankfully, it’s entirely possible to re-implement MapReduce-like computations in Spark. They can be simpler to maintain, and in some cases faster, thanks to Spark’s ability to optimize away spilling to disk. For MapReduce, re-implementation on Spark is a homecoming. Spark, after all, mimics Scala‘s functional programming style and APIs. And the very idea of MapReduce comes from the functional programming language LISP.

Although Spark’s primary abstraction, the RDD (Resilient Distributed Dataset), plainly exposes map() and reduce() operations, these are not the direct analog of Hadoop’s Mapper or Reducer APIs. This is often a stumbling block for developers looking to move Mapper and Reducer classes to Spark equivalents.

Viewed in comparison with classic functional language implementations of map() and reduce() in Scala or Spark, the Mapper and Reducer APIs in Hadoop are actually both more flexible and more complex as a result. These differences may not even be apparent to developers accustomed to MapReduce, but, the following behaviors are specific to Hadoop’s implementation rather than the idea of MapReduce in the abstract:

  • Mappers and Reducers always use key-value pairs as input and output.
  • A Reducer reduces values per key only.
  • A Mapper or Reducer may emit 0, 1 or more key-value pairs for every input.
  • Mappers and Reducers may emit any arbitrary keys or values, not just subsets or transformations of those in the input.
  • Mapper and Reducer objects have a lifecycle that spans many map() and reduce() calls. They support asetup() and cleanup() method, which can be used to take actions before or after a batch of records is processed.

This post will briefly demonstrate how to recreate each of these within Spark — and also show that it’s not necessarily desirable to literally translate a Mapper and Reducer!

Key-Value Pairs as Tuples

Let’s say we need to compute the length of each line in a large text input, and report the count of lines by line length. In Hadoop MapReduce, this begins with a Mapper that produces key-value pairs in which the line length is the key, and count of 1 is the value:

public class LineLengthMapper
    extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
  @Override
  protected void map(LongWritable lineNumber, Text line, Context context)
      throws IOException, InterruptedException {
    context.write(new IntWritable(line.getLength()), new IntWritable(1));
  }
}

It’s worth noting that Mappers and Reducers only operate on key-value pairs. So the input to LineLengthMapper, provided by a TextInputFormat, is actually a pair containing the line as value, with position within the file thrown in as a key, for fun. (It’s rarely used, but, something has to be the key.)

The Spark equivalent is:

Reducer and reduce() versus reduceByKey()

To produce a count of line lengths, it’s necessary to sum the counts per length in a Reducer:

public class LineLengthReducer
    extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
  @Override
  protected void reduce(IntWritable length, Iterable counts, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable count : counts) {
      sum += count.get();
    }
    context.write(length, new IntWritable(sum));
  }
}

The equivalent of the Mapper and Reducer above together is a one-liner in Spark:

It is worth pointing out here that a Reducer’s reduce() method receives a stream of many values, and produces 0, 1 or more results. reduceByKey(), in contrast, accepts a function that turns exactly two values into exactly one — here, a simple addition function that maps two numbers to their sum. This associative function can be used to reduce many values to one for the caller. It is a simpler, narrower API for reducing values by key than what a Reducer exposes.

Mapper and map() versus flatMap()

Now, instead consider counting the occurrences of only words beginning with an uppercase character. For each line of text in the input, a Mapper might emit 0, 1 or many key-value pairs:

map() will not suffice here, because map() must produce exactly one output per input, but unlike before, one line needs to yield potentially many outputs. Again, the map() function in Spark is simpler and narrower compared to what the Mapper API supports.

The solution in Spark is to first map each line to an array of output values. The array may be empty, or have many values. Merely map()-ing lines to arrays would produce an RDD of arrays as the result, when the result should be the contents of those arrays. The result needs to be “flattened” afterward, and flatMap() does exactly this. Here, the array of words in the line is filtered and converted into tuples inside the function. In a case like this, it’s flatMap()that’s required to emulate such a Mapper, not map().

groupByKey()

It’s simple to write a Reducer that then adds up the counts for each word, as before. And in Spark, again,reduceByKey() could be used to sum counts per word. But what if for some reason the output has to contain the word in all uppercase, along with a count? In MapReduce, that’s:

Be careful! groupByKey() works, but also collects all values for a key into memory. If a key is associated to many values, a worker could run out of memory. Although this is the most direct analog of a Reducer, it’s not necessarily the best choice in all cases. For example, Spark could have simply transformed the keys after a call to reduceByKey:

It’s better to let Spark manage the reduction rather than ask it to collect all values just for us to manually sum them.

setup() and cleanup()

In MapReduce, a Mapper and Reducer can declare a setup() method, called before any input is processed, to perhaps allocate an expensive resource like a database connection, and a cleanup() method to release the resource:

The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:

However, this fails for several reasons:

  • It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.
  • map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.
  • Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies.

In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the importantmapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions()function can allocate resources locally at its start, and release them when done mapping many values.

Adding setup code is simple; adding cleanup code is harder because it remains difficult to detect when the transformed iterator has been fully evaluated. For example, this does not work:

Although decidedly less elegant than previous translations, it can be done.

There is no flatMapPartitions() method. However, the same effect can be achieved by callingmapPartitions(), followed by a call to flatMap(a => a) to flatten.

The equivalent of a Reducer with setup() and cleanup() is just a groupByKey() followed by amapPartitions() call like the one above. Take note of the caveat about using groupByKey() above, though.

But Wait, There’s More

MapReduce developers will point out that there is yet more to the API that hasn’t been mentioned yet:

  • MapReduce supports a special type of Reducer, called a Combiner, that can reduce shuffled data size from a Mapper.
  • It also supports custom partitioning via a Partitioner, and custom grouping for purposes of the Reducer via grouping Comparator.
  • The Context objects give access to a Counter API for accumulating statistics.
  • A Reducer always sees keys in sorted order within its lifecycle.
  • MapReduce has its own Writable serialization scheme.
  • Mappers and Reducers can emit multiple outputs at once.
  • MapReduce alone has tens of tuning parameters.

There are ways to implement or port these concepts into Spark, using APIs like the Accumulator, methods likegroupBy() and the partitioner argument in various of these methods, Java or Kryo serialization, caching, and more. To keep this post brief, the remainder will be left to a follow-up post.

The concepts in MapReduce haven’t stopped being useful. It just now has a different and potentially more powerful implementation on Hadoop, and in a functional language that better matches its functional roots. Understanding the differences between Spark’s RDD API, and the original Mapper and Reducer APIs, helps developers better understand how all of them truly work and how to use Spark’s counterparts to best advantage.

Sean Owen is Director of Data Science at Cloudera, an Apache Mahout committer/PMC member, and a Spark contributor.

Source: Cloudera.com

Aside

techsummit.a16z

Visit the future. Join the most forward-thinking people and companies on the planet for an inside look at the technologies that are changing business forever.

Bringing together bleeding-edge startups, big company leaders, and a select group of CIOs and CMOs shaping a variety of industries, Tech Summit 2014 will delve into everything from the latest trends in mobile, payments, big data and cloud, to what’s next in smart devices connected to even smarter services.

As software continues to eat the world, the interactions and relationships between people, things and companies are being reimagined. Along the way, our notion of ownership, community, identity and how we get the job done is changing. How will you change along with it?

The future gets explained October 27-28. For more information, http://techsummit.a16z.com/

 

Aside

Since 2005, Hadoop has been the foundation for hundreds of big data companies, due to its open-sourced nature. Over 170 well-known companies have contributed to its development since launch, and the project is currently valued at over $2 billion.

But what exactly is Hadoop, and why is it so important? In layman’s terms, Hadoop is a framework for creating and supporting big data and large scale processing applications – something that a traditional software isn’t able to do. The whole Hadoop framework relies on 4 main modules that work together:

  1. Hadoop Common is like the SDK for the whole Hadoop framework, providing the necessary libraries and utilities needed by the other 3 modules.

  2. Hadoop Distributed Files System (HDFS) is the file system that stores all of the data at high bandwidth, in clusters (think RAID).

  3. Hadoop Yarn is the module that manages the computational resources, again in clusters, for application scheduling.

  4. Finally, Hadoop Mapreduce is the programming model for creating the large scale and big data applications.

Hadoop is a very powerful framework for big data companies, and its overall use has been on the rise since its inception in 2005 – over 25% organizations currently use Hadoop to manage their data, up from 10% in 2012. Because Hadoop is open source and flexible to a variety of needs, it has been applied to almost every industry imaginable in the current big data boom – from finance, to retail to education and government.

Current-State-of-Hadoop-Infographic_s

Source: SmartDataCollective.com

Aside

big-data-inline-660x515

There’s a new record holder in the world of “big data.”

On Friday, Databricks—a startup spun out of the University California, Berkeley—announced that it has sorted 100 terabytes of data in a record 23 minutes using anumber-crunching tool called Spark, eclipsing the previous record held by Yahoo and the popular big-data tool Hadoop.

The feat is impressive in and of itself, but it’s also a sign that the world of big data—where dozens, hundreds, or even thousands of machines can be used to sort and analyze massive amounts of online information—continues to evolve at a rather rapid pace. Hadoop has long served as the poster child for the big-data movement, but in recent years, the state-of-the-art has moved well beyond the original ideas that spawned it.

Based on research papers that Google published about its own big-data systems in 2003 and 2004, Hadoop sprung up at Yahoo, and it’s now used by many of the web’s biggest names, from Facebook to Twitter and eBay. In the beginning, it wasn’t something that operated in “real-time”—when crunching large amounts of data, you had to wait a while—but now, Spark and other tools, many based on Hadoop, are analyzing massive datasets at much greater speeds.

One of the main problems with Hadoop MapReduce—the original platform—is that it’s a “batch system.” That means it crunches data in batches. It takes a while to crunch each set of information, and if you want to add more data to the process, you have to start over with a new batch. But the state of the art has improved dramatically since Google released those papers in 2003 and 2004. These days, Google uses newer tools like Dremel to analyze enormous amounts of data in near real-time, and the open source world is scrambling to keep up.

Developed by researchers at the University of California, Berkeley who are now commercializing the tech through Databricks, Spark is just one part of this movement. The Silicon Valley startup Cloudera offers a system called Impala, while competitor MapR is developing a Dremel-style tool called Drill. Meanwhile, the open source Hadoop project now offers a new interface called Yarn.

Part of Spark’s appeal is that it can process data in computer memory, as opposed to just using hard disks, much move at slower speeds. But because the amount of data that can fit in-memory is limited, the tool can process data on disks as well, and that’s what Databricks was trying to highlight as it sought to break’s Yahoo’s record on the Gray Sort, which measures the time it takes to sort 100 terabytes of data, aka 100,000 gigabytes.

Yahoo did the sort in 72 minutes with a cluster of 2,100 machines using Hadoop MapReduce last year. Databricks was able to process the same amount of data in 23 minutes using Spark, using only 206 virtual machines running on Amazon’s cloud service. It also sorted a petabtye of data—about 1,000 terabytes — in less than four hours using 190 machines.

Although this appears to be a record for this type of sorting using open source software, there are ways to sort data faster. Back in 2011, Google previouslyconducted a petabyte sort in only 33 minutes, as pointed out by a commenter on the popular programmer hangout Hacker News. But it took 8,000 machines to do what Databricks did with 206. And, as Databricks engineer Reynold Xin tells us, Google didn’t share its process with the world, so we don’t know if it followed the rules specified as part of the Gray Sort.

But most importantly, Databricks did its test using software that anyone can use. “We were comparing with open source project Hadoop MapReduce,” Xin says. “Google’s results are with regard to their own MapReduce implementation that is not accessible to the rest of the world.”

Source: Wired.com

Aside

hacker_cartoon

I recently spoke to about a dozen aspiring data scientists, and a very common concern was, “There are just so many different programming languages, and so many different software packages and databases. I feel overwhelmed, and I don’t know where to start!”

In this post, I will explain everything you need to learn to get started as a data “hacker.”

What is a data hacker?

Harlan Harris and Vincent Granville have both written articles about the different types of data scientists. Harris’s article is more about the roles of data scientists, whereas Granville’s article is more about the skills of data scientists. Harris breaks data scientists into 4 types: Data Businesspeople (technical managers, data entrepreneurs), Data Creatives (hackers, jack-of-all-trades types), Data Developers (big data programmers), and Data Researchers (usually PhDs in computer science or statistics working in academia or in large research labs). I consider myself a jack-of-all-trades, so I think I fit into the Data Creative type. In this article, I will focus on how to become a data hacker (called a Data Creative by Harlan Harris).

So how do I become a Data Hacker?

Hackers tend to have a broad set of technical skills, although they may not be true experts at any one skill:
– Statistical Programming
– Machine Learning
– Visualization
– Reporting/Dashboarding
– Databases
– Big Data
– Data “Munging”

This is a long list! How does someone actually learn all of these things in a reasonable amount of time? The key is to pick a single comprehensive technology stack, and do everything using that stack.

The R-Hadoop technology stack

R-langR is a free, open-source statistical programming language originally based on the S programming language. Here are a few reasons why R is a great place to start for data analysis:
– It’s completely free: SAS and SPSS are expensive to get started with, and you often need to buynew methods if you want to try them out
– It’s comprehensive: almost any statistical or machine-learning task you could think of has pre-built libraries for you to use in R.
– R is easy to learn, and especially good for hacking: you don’t need to have a lot of programming experience to get started doing useful work in R
– R is a full-fledged programming language: unlike SAS or SPSS, R is not just a procedural language for doing data analysis
– R is great for getting a job, especially in the tech industry

 

apache-hadoopHadoop is a free, open-source distributed computing framework. Hadoop is used for all aspects of Big Data: storage, databases, analysis, and even modeling. Hadoop is used at many of the top companies in the world, including Facebook, Twitter, and LinkedIn. When you hear about Hadoop, you typically hear about MapReduce, which is a framework that allows you to solve (extremely) large-scale data processing problems on a cluster of commodity computers. Here are a few reasons why Hadoop is a great way to get started with Big Data:
– Again, it’s completely free
– It’s easy to get started, even if you don’t have your own cluster of computers: check outCloudera for an online trial and a VM you can download for free
– Hadoop is comprehensive: almost any Big Data storage or processing problem can be solved within the Hadoop ecosystem
– Hadoop is great for getting a job: it seems like it’s on every data science job ad nowadays!

 

R-HadoopThe R-Hadoop stack allows to do almost anything you need to for data hacking:
– Statistical Programming: R has packages for data exploration, statistical tests, regression, and everything else you could imagine.
– Machine Learning: The caret package is a wrapper for dozens of machine learning algorithms, and makes it easy to train, tune, and test machine-learning models.
– Visualization: The ggplot2 package allows you to make professional-looking, fully customizable 2D plots.
– Reporting/Dashboarding: The knitr package allows you to generate beautiful, dynamic reports with R. The shiny package is a web framework for building stylish, interactive web apps with R.
– Databases: Hive is a highly-scalable data warehousing system built on Hadoop for ad-hoc SQL-style querying of huge datasets (developed at Facebook). Cassandra (used by Netflix) and HBase (used by Twitter) are other database solutions for other purposes built on Hadoop.
– Big Data: This is what Hadoop was made for. Hadoop allows you to store and process essentially unlimited amounts of data on commodity hardware (you don’t need a supercomputer anymore). And depending on how big you mean by “big” data, R has some spectacular libraries for working directly with it, like data.table.
– Data “Munging”: Data munging refers to “cleaning” data and rearranging data in more useful ways (think parsing unusual date formats, removing malformed values, turning columns into rows, etc.). Both R and Hadoop have facilities for this. R is awesome and easy for small to moderate-sized data sets, and Hadoop allows you to write your own programs to clean and rearrange really big data sets when needed.

R and Hadoop can both be run on Windows, but it is much more natural and easier to use a Unix-based system. That might be a bit of a learning curve, but the rewards of learning Unix are incredibly high, and it’s great for a resume.

What the R-Hadoop stack is not great for

R and Hadoop can cover most use-cases, but there are situations where you’ll want to use something else. For example, Python has libraries that make text-mining much easier and more scalable than it is in R. And sometimes, if you’re building a web app, Shiny is just not flexible enough, so you’ll want to use a more traditional web framework. But for most purposes, most of the time, you can get by with R and Hadoop.

Why just stick to one technology stack?

Some of you might be saying: “Shouldn’t you always just use the right tool for the job? Why would you let yourself get so sucked in to one ecosystem?” That is a very good point, but I think that there are huge advantages to focusing on one thing, especially when you are starting out. First of all, you will waste lots of time if you switch training paths and resources all of the time, because the startup costs of learning a new technology are so high. Secondly, it is extremely useful and motivating to focus on one technology, because getting good at one thing is the fastest way to be able to solve real-world problems (instead of the toy examples you usually use when learning a new technology). And finally, R and Hadoop are often the best tool for the job, anyway!

So how do I get started?

Here’s how I recommend you get started: first, start with “toy” examples and a little formal training. Then, pick a simple project that interests you, and then try it out! You will learn a lot by overcoming the natural roadblocks that arise in working with real data. Here are some good resources to get you started:

Intro to R

Intro to Hadoop

Project ideas

  • Find a dataset of historical results from the World Series, create a couple of visualizations using ggplot2, and create a simple web-app in Shiny to display the visualizations.
  • Build a classification model to identify survivors of the Titanic using the Kaggle Titanic dataset and R’s caret package
  • Pull the 1990 and 2000 US Census data into a Hive database on Amazon Web Services. Can you find any surprising demographic differences between 1990 and 2000 with a few Hive queries?

Source: Will-stanton.com

Aside

YARN and Apache Storm: A Powerful Combination

YARN changed the game for all data access engines in Apache Hadoop. As part of Hadoop 2, YARN took the resource management capabilities that were in MapReduce and packaged them for use by new engines. Now Apache Storm is one of those data-processing engines that can run alongside many others, coordinated by YARN.

YARN’s architecture makes it much easier for users to build and run multiple applications in Hadoop, all sharing a common resource manager. We see those applications arising and are excited by the additional opportunities that brings for Storm.

Now let’s talk about Apache Storm…

What is Apache Storm and Why is it Useful?

Apache Storm is a distributed, fault tolerant, and highly scalable platform for processing streaming data. Storm supports a wide range of use cases, including real-time analytics, machine learning, continuous computation, and more. It is also extremely fast, with the ability to process over a million records per second per node on a modest sized cluster.

With the explosion of data sources in recent years, many Apache Hadoop users have recognized the necessity to process data in real time while also maintaining traditional batch and interactive data workflows. Apache Storm fills that real-time role and offers tight integration with many of the tools and technologies commonly found in the Hadoop ecosystem.

storm_2
The official Apache Storm logo, created by logo contest winner Jennifer Lee

A Little History

Nathan Marz originally developed storm while he was at BackType (later acquired by Twitter), and it was open-sourced in September of 2011. In September 2013, Storm entered the Apache Software Foundation (ASF) as an incubator project. Since then, the Storm community has flourished and grown significantly, delivering a number of software releases in accordance with the strict licensing guidelines required of any Apache project.

The Future of Apache Storm

These rules are important to both software developers and users for the following reasons:

  1. They provide certain legal protections to both developers and end users/organizations.
  2. They ensure that a diverse and self-sustaining community backs the project
  3. They promote a strong, vibrant developer community that ensures that the project will continue to improve and innovate new features.

We are pleased to say that over the course of the past year, the Apache Storm community has demonstrated the ability to adhere to these requirements, and is finalizing the steps necessary to graduate to an Apache Top Level Project.

Like any promising student on the verge of graduation, the Apache Storm community is looking ahead to a bright future. So what can we expect of Apache Storm?

YARN Integration

YARN

Apache Storm’s Place in Hortonworks Data Platform’s YARN-based Architecture

YARN fundamentally changed Hadoop resource management and enabled the deployment of new services in the Hadoop infrastructure. Many users are eager to see Apache Storm take advantage of these YARN capabilities.

There are a number of efforts underway to bring YARN support to Apache Storm. In the near term, Apache Slider will bring YARN support not just to Storm, but also to virtually any long-running application. This significantly lowers the barrier to adding YARN support to an existing application or framework.

In the longer term, we will see a much deeper, native integration between Apache Storm and YARN. There are countless opportunities in this area including dynamic, elastic scaling of YARN-based Storm clusters in response to resource utilization.

Today, you can think of a Storm cluster as an infrastructure to which you deploy applications (topologies). Tomorrow, this distinction will be blurred significantly, allowing developers and data scientists to focus more on their applications, and less on the infrastructure in which they run.

Secure, Multi-Tenant Deployment

Much like the early days of Hadoop, Apache Storm originally evolved in an environment where security was not a high-priority concern. Rather, it was assumed that Storm would be deployed to environments suitably cordoned off from security threats. While a large number of users were comfortable setting up their own security measures for Storm, this proved a hindrance to broader adoption among larger enterprises where security policies prohibited deployment without specific safeguards.

Yahoo! hosts one of the largest Storm deployments in the world, and the engineering team recognized the need for security early on, so it implemented many of the features necessary to secure its own Apache Storm deployment. Yahoo!, Hortonworks, Symantec, and the broader Apache Storm community have been working on integrating those security innovations into the main Apache code base.

That work is nearing completion, and is slated to be included in an upcoming Apache Storm release. Some of the highlights of that release include:

  • Kerberos Authentication with Automatic Credential Push and Renewal
  • Multi-Tenant Scheduling
  • Secure integration with other Hadoop Projects (such as ZooKeeper, HDFS, HBase, etc.)
  • User isolation (Storm topologies run as the user who submitted them)

In the future, you can expect to see further integration between Apache Storm and security-focused projects like Apache Argus (formerly XA Secure).

Scalability Improvements

Apache Storm is already highly scalable and fast enough so that most common use cases require cluster sizes of less than 20 nodes, even with SLAs that require processing more than a million records per second. Some users, however, have experienced limitations when trying to deploy larger clusters in the range of several thousand nodes.

Each Apache Storm release has seen incremental improvement in terms of performance and scalability, and you can expect this trend to continue. In the future, Apache Storm will scale to several thousand nodes for those who need that level of real-time processing power.

High Availability

Experienced Storm users will recognize that the Storm Nimbus service is not a single point of failure in the strictest sense (i.e. loss of the Nimbus node will not affect running topologies). However, the loss of the Nimbus node does degrade functionality for deploying new topologies and reassigning work across a cluster.

Upcoming releases will eliminate this “soft” point of failure by supporting an HA Nimbus. Multiple instances of the Nimbus service run in a cluster and perform leader election when a Nimbus node fails.

Enhanced Tooling and Language Support

langs

Whether you are developing a batch processing, a stream processing, or a hybrid application (e.g. a Lamda architecture), higher-level abstractions and visualization tools enable developers and data scientists to more easily understand and manipulate data flows.

Tools such as Pig, Cascading, and the SQL on Hadoop ecosystem are excellent examples of how those abstractions can significantly improve productivity. This applies equally to stream processing. In the future we will see many of the same concepts implemented on top of Storm, along with more advanced tools to visualize and manage streaming data flows.

We will also see broader language support for Storm. Apache Storm has always embraced polyglot programming to better support the languages with which developers are most comfortable and productive.

The Rise of the Micro-Batch

The recent surge of interest in Apache Spark and the fledgling Spark Streaming project have raised awareness of the concept of micro-batching, where real-time data streams are partitioned into smaller “mini batches” for processing. With micro-batching, you trade the inherent low latency of one-at-a-time processing for higher throughput (and higher latency).

One of the interesting properties of micro-batching is that given the right primitives, it’s possible to implement exactly-once processing, which is just what Storm’s Trident API does.

Storm has supported both the one-at-a-time as well as micro-batch processing (via the Trident API) since (pre-Apache) version 0.8.0. Trident differentiates Storm from other stream processing frameworks because it supports Distributed Remote Procedure Calls (DRPCs). DRPCs leverage Storm’s inherent parallelism in synchronous request-response invocations. In the future you can expect to see broader adoption of Storm’s Trident API, with API enhancements and improved tooling and language support.

The future of Apache Storm is very bright indeed, with exciting times ahead. The Storm community is growing, and we encourage anyone interested to participate by contributing to the project.

Discover and Learn More

  • Visit our Apache Storm Roadmap
  • Read about Apache Storm
  • Try Apache Storm Tutorials

Source: Hortonworks.com

Aside

YARN2

The Apache Hadoop community has voted to release Apache Hadoop 2.5.0.

Apache Hadoop 2.5.0 is a minor release in the 2.x release line and includes some major features and improvements, including:

More details can be found in the documentation and release notes.

The next minor release (2.6.0) is expected to include some major features as well, including transparent encryption in HDFS along with a key management server,  work-preserving restarts of all YARN daemons, and others. Refer to the roadmap for a full, updated list.

Currently, Hadoop 2.5 is scheduled to ship inside CDH 5.2 (in late 2014).

Aside

Here is a compelling presentation by Databricks CEO Ion Stoica that sets the stage for Spark’s continued advance in the big data ecosystem. The Databricks Cloud provides the full power of Spark, in the cloud, plus a powerful set of features for exploring and visualization your data, as well as writing and deploying production data products.

The Databricks Cloud includes the following features:

The Full Power of Spark

  • Databricks Cloud uses only the 100% open source Spark API
  • Perform sophisticated analysis with MLlib (machine learning), GraphX, and Spark SQL
  • Your code runs the same way on any Spark distribution

Fully Managed

  • Get started with big data in seconds
  • Databricks Cloud manages metadata, launching clusters, and access control
  • Work seamlessly with your existing S3 data

Powerful Notebooks & Beautiful Dashboards

  • Visualize data right as you explore it
  • Collaborate in real-time
  • Export your analysis to production dashboards in seconds

Source: Inside-Bigdata.com

Aside

Applications using HDFS, such as Impala, will be able to read data up to 59x faster thanks to this new feature.

Server memory capacity and bandwidth have increased dramatically over the last few years. Beefier servers make in-memory computation quite attractive, since a lot of interesting data sets can fit into cluster memory, and memory is orders of magnitude faster than disk.

For the latest release of CDH 5.1, Cloudera contributed a read caching feature to HDFS to allow applications in the Apache Hadoop ecosystem to take full advantage of the potential of in-memory computation (HDFS-4949). By using caching, we’ve seen a speedup of up to 59x compared to reading from disk, and up to 3x compared to reading from page cache.

We’ll cover performance evaluation in more detail in a future blog post. Here, we’ll focus on the motivation and design of HDFS caching.

Motivation

A form of memory caching is already present on each HDFS DataNode: the operating system page cache. The page cache automatically caches recently accessed data on the local filesystem. Because of the page cache, reading the same file more than once will often result in a dramatic speedup. However, the OS page cache falls short when considered in the setting of a distributed system.

One issue is the lack of global information about the in-memory state of each node. Given the choice of multiple HDFS replicas from which to read some data, an application is unable to schedule its tasks for cache-locality. Since the application is forced to schedule its tasks blindly, performance suffers.


When a data analyst runs a query, the application scheduler chooses one of the three block replica locations and runs its task there, which pulls the replica into the page cache (A). However, if the analyst runs the same query again, the scheduler has no way of knowing which replica is in the page cache, and thus no way to place its task for cache locality (B).

Another issue is the page cache’s replacement algorithm, which is a modified version of “least-recently used” eviction. LRU-like algorithms are susceptible to large scans that wipe out the existing contents of the cache. This happens quite commonly on shared Hadoop clusters.

Consider a data analyst running interactive queries on a memory-sized working set: If a large I/O-heavy MapReduce job runs at the same time, it will evict the data analyst’s working set from the page cache, leading to poor interactive performance. Without application-level knowledge of which dataset to keep in memory, the page cache can do no better for mixed workloads. Finally, although reading data from the page cache is faster than disk, it is still inefficient compared to reading directly from memory (so-called zero-copy reads).

Another source of inefficiency is checksum verification. These checksums are intended to catch disk and network errors, and can theoretically be skipped if the client is reading from local in-memory data that has already been checksummed. However, skipping redundant checksumming safely is impossible with the page cache since there’s no way to guarantee that a read is coming from memory. By fixing these two issues, we were able to improve read performance by up to 3x compared to reading from page cache.

Architecture

The above issues resulted in the following three design requirements:

  1. Global knowledge of cluster cache state, so tasks can be scheduled for cache locality
  2. Global control over cluster cache state, for predictable performance for mixed workloads
  3. Pinning of data in local caches, to enable zero-copy reads and skipping checksums

Based on these requirements, we decided to add centralized cache management to the NameNode.


Example of an HDFS client caching a file: First, itsends a cache directive asking the NameNode to cache the file. The NameNode chooses some DataNodes to cache the requested file, with cache commands piggy-backed on the DataNode heartbeat. DataNodes respond with a cache report when the data is successfully cached.

Caching is explicit and user-driven. When a user wants something cached, they express their intent by creating a cache directive on the NameNode. A cache directive specifies the desired path to cache (meaning a file or directory in HDFS), a desired cache replication factor (up to the file’s replication factor), and the cache pool for the directive (used to enforce quotas on memory use). The system does not automatically manage cache directives, so it’s up to users to manage their outstanding cache directives based on their usage patterns.

Assuming that this cache directive is valid, the NameNode will attempt to cache said data. It will select cache locations from the set of DataNodes with the data on disk, and ask them to cache the data by piggy-backing a cache command on the DataNode heartbeat reply. This is the same way block replication and invalidation commands are sent.

When a DataNode receives a cache command, it pulls the desired data into its local cache by using mmap() andmlock() methods and then verifies its checksums. This series of operations guarantees that the data will remain resident in memory, and that it is safe to read without further checksum verification. Using the mmap() and mlock()methods has the advantage of storing the data off-heap, so large amounts of data can be cached without affecting garbage collection.

Because mlock() takes advantage of the OS page cache, if the block is already held there, we don’t need to copy it. The disadvantage of mlock is that the block must already exist in the filesystem before it can be locked in memory. So we cannot cache replicas on nodes that don’t have the replica already on disk.

DataNodes periodically send cache reports to the NameNode, which contain the state of their local cache. As soon as the NameNode knows that a block has been successfully cached on a DataNode, application schedulers can query the NameNode for this information and use it to schedule tasks for cache-locality.

Zero-copy Reads

Zero-copy read (ZCR) is the final step in efforts to improve the efficiency of the HDFS read path. Copies are one of the most obvious sources of inefficiency; the more time spent copying data, the fewer CPU cycles are left for useful work. ZCR is theoretically optimal in this regard, hence the name “zero-copy.”

The standard HDFS remote read path copies data from the kernel into the DataNode prior to sending it on to theDFSClient via a TCP socket. Short-circuit local reads eliminate this copy by “short-circuiting” the trip through the DataNode. Instead, the client simply reads the block file directly from the local filesystem.

However, even when using short-circuit reads, the DFSClient still needs to copy the data from kernel page cache into the client’s address space. ZCR, implemented in HDFS-4953, allow us to avoid that copy. Instead of copying, we use the mmap() system call to map the block from page cache directly into the client’s address space. ZCR also avoids the context switch overhead of repeated invocations of the read system call, which can be significant.

However, mmap() has some disadvantages. One difficulty is handling I/O errors. If a read() system call encounters an I/O error, it simply returns an error code. Accessing a memory-mapped segment can’t return an error, so any error results in a SIGBUS signal instead. Unless a signal handler has been installed, the calling process is terminated.

Fortunately, if a client is reading data that is cached by HDFS, it will never hit an I/O error (and thus never get aSIGBUS) — because the data is pinned in memory with mlock(). This approach lets us safely do ZCR without worrying about unexpected program termination. The client can also skip checksum verification when reading cached data, as the data is already checksummed by the datanode when it’s cached.

The ZCR API is described in HDFS-5191. In addition to a Java API, there is also a C API that allows applications such as Impala to take full advantage of zero-copy reads.

Example CLI usage

Here’s a simple example of creating a new cache pool and adding a cache directive for a file. This example assumes you’ve already configured your cluster correctly according to the official documentation.

$ hadoop fs -put myfile /
$ hadoop fs -put myfile /
$ # Add a new cache pool and cache directive
$ hdfs cacheadmin -addPool testPool
Successfully added cache pool testPool.
$ hdfs cacheadmin -addDirective -path /myfile -pool testPool
Added cache directive 1
$ # Wait for a minute or two for the NameNode to gather all datanode cache statistics. 512 of 512 bytes of our file should be cached.
$ hdfs cacheadmin -listPools -stats testPool
Found1 result.
NAME      OWNER   GROUP   MODE            LIMIT  MAXTTL  BYTES_NEEDED  BYTES_CACHED  BYTES_OVERLIMIT  FILES_NEEDED  FILES_CACHED
testPool  andrew  andrew  rwxr-xr-x   unlimited   never           512           512                0             1
$ # Look at the datanode stats, see that our DN is using 1 page of cache
$ hdfs dfsadmin -report
...<snip>...
Live datanodes (1):
...<snip>...
ConfiguredCacheCapacity:64000(62.50 KB)
CacheUsed:4096(4 KB)
CacheRemaining:59904(58.50 KB)
CacheUsed%:6.40%
CacheRemaining%:93.60%

Future Work

There are a number of further improvements we’d like to explore. For example, a current limitation of the system is that users need to manually specify what files and directories should be cached. Instead, HDFS could automatically manage what is cached based on workload patterns or hints.

Another potential improvement would be to extend HDFS caching to output files as well as input files. One potential use case for this so-called write-caching is for intermediate stages of a multi-job pipeline. Write-caching could avoid writing to disk at all, if durability is not required. This avenue of development is being pursued in HDFS-5851.

Conclusion

Due to increasing memory capacity, many interesting working sets are able to fit in aggregate cluster memory. By using HDFS centralized cache management, applications can take advantage of the performance benefits of in-memory computation. Cluster cache state is aggregated and controlled by the NameNode, allowing applications schedulers to place their tasks for cache locality. Explicit pinning of datasets allows users to isolate their working sets from other users on shared clusters. Finally, the new zero-copy read API offers substantially improved I/O performance by allowing clients to safely skip overhead from checksumming and the read()syscall.

In a follow-up post, we’ll analyze the performance of HDFS caching using a number of micro and macro benchmarks. Stay tuned!

Colin McCabe and Andrew Wang are both Software Engineers at Cloudera, and Hadoop committers/PMC members.

References:

http://blog.cloudera.com/blog/2014/08/new-in-cdh-5-1-hdfs-read-caching/