Using Hadoop 2.2.0 plugin for Eclipse

A powerful development environment for Java-based programming is Eclipse. Eclipse is a free, open-source IDE. It supports multiple languages through a plugin interface, with special attention paid to Java. Tools designed for working with Hadoop can be integrated into Eclipse, making it an attractive platform for Hadoop development. In this section we will review how to obtain, configure, and use Eclipse.

DOWNLOADING AND INSTALLING

Note: The most current release of Eclipse is called Ganymede. Our testing shows that Ganymede is currently incompatible with the Hadoop MapReduce plugin. The most recent version which worked properly with the Hadoop plugin is version 3.3.1, “Europa.” To download Europa, do not visit the main Eclipse website; it can be found in the archive site http://archive.eclipse.org/eclipse/downloads/ as the “Archived Release (3.3.1).”

The Eclipse website has several versions available for download; choose either “Eclipse Classic” or “Eclipse IDE for Java Developers.”

Because it is written in Java, Eclipse is very cross-platform. Eclipse is available for Windows, Linux, and Mac OSX.

Installing Eclipse is very straightforward. Eclipse is packaged as a .zip file. Windows itself can natively unzip the compressed file into a directory. If you encounter errors using the Windows decompression tool (see [1]), try using a third-party unzip utility such as 7-zip or WinRAR.

After you have decompressed Eclipse into a directory, you can run it straight from that directory with no modifications or other “installation” procedure. You may want to move it into C:Program FilesEclipse to keep consistent with your other applications, but it can reside in the Desktop or elsewhere as well.

INSTALLING THE HADOOP PLUGIN

apt-get install ant
  • Go to hadoop-plugin home directory:
$cd src/contrib/eclipse-plugin
  • Build plugin for eclipse:
$ant jar -Dversion=2.2.0 -Declipse.home=/opt/eclipse -Dhadoop.home=/usr/share/hadoop
  • Final jar file is located in hadoop2x-eclipse-plugin/build/contrib/eclipse-plugin and then we copy this file to eclipse plugin folder
cp hadoop-eclipse-plugin-2.2.0.jar  /{Eclipse-Home}/plugins/

Note

  • eclipse.home: path of eclipse home
  • hadoop.home: path of hadoop 2.x home

MAKING A COPY OF HADOOP

While we will be running MapReduce programs on the virtual machine, we will be compiling them on the host machine. The host therefore needs a copy of the Hadoop jars to compile your code against. Copy the/hadoop-0.18.0  directory from the CD into a location on your local drive, and remember where this is. You do not need to configure this copy of Hadoop in any way.

RUNNING ECLIPSE

Navigate into the Eclipse directory and run eclipse.exe to start the IDE. Eclipse stores all of your source projects and their related settings in a directory called a workspace.

Upon starting Eclipse, it will prompt you for a directory to act as the workspace. Choose a directory name that makes sense to you and click OK.

eclipse

Figure 1: When you first start Eclipse, you must choose a directory to act as your workspace.

CONFIGURING THE MAPREDUCE PLUGIN

In this section, we will walk through the process of configuring Eclipse to switch to the MapReduce perspective and connect to the Hadoop virtual machine.

Step 1: If you have not already done so, start Eclipse and choose a workspace directory. If you are presented with a “welcome” screen, click the button that says “Go to the Workbench.” The Workbench is the main view of Eclipse, where you can write source code, launch programs, and manage your projects.

Step 2: Start the virtual machine. Double-click on the image.vmx file in the virtual machine’s installation directory to launch the virtual machine. It should begin the Linux boot process.

Step 3: Switch to the MapReduce perspective. In the upper-right corner of the workbench, click the “Open Perspective” button, as shown in Figure 3.4:

perspective

Figure 2: Changing the Perspective

Select “Other,” followed by “Map/Reduce” in the window that opens up. At first, nothing may appear to change. In the menu, choose Window * Show View * Other. Under “MapReduce Tools,” select “Map/Reduce Locations.” This should make a new panel visible at the bottom of the screen, next to Problems and Tasks.

Step 4: Add the Server. In the Map/Reduce Locations panel, click on the elephant logo in the upper-right corner to add a new server to Eclipse.

server
Figure 3: Adding a New Server

You will now be asked to fill in a number of parameters identifying the server. To connect to the VMware image, the values are:

Location name: (Any descriptive name you want; e.g., "VMware server")
Map/Reduce Master Host: (The IP address printed at startup)
Map/Reduce Master Port: 50020
DFS Master Port: 9000
User name: hadoop-user

Next, click on the “Advanced” tab. There are two settings here which must be changed.

Scroll down to hadoop.job.ugi. It contains your current Windows login credentials. Highlight the first comma-separated value in this list (your username) and replace it with hadoop-user.

Next, scroll further down to mapred.system.dir. Erase the current value and set it to/hadoop/mapred/system.

When you are done, click “Finish.” Your server will now appear in the Map/Reduce Locations panel. If you look in the Project Explorer (upper-left corner of Eclipse), you will see that the MapReduce plugin has added the ability to browse HDFS. Click the [+] buttons to expand the directory tree to see any files already there. If you inserted files into HDFS yourself, they will be visible in this tree.

location

Figure 4: Files Visible in the HDFS Viewer

Now that your system is configured, the following sections will introduce you to the basic features and verify that they work correctly.

Note: if you get warning message :”log4j:WARN Please initialize the log4j system properly”, let import log4j.properties in $HADOOP_HOME/etc/hadoop/log4j.properties into your project to fix thís warn.

Advertisements

MapReduce Streaming Framework Summingbird

Available under the Apache 2 license, Summingbird is a large-scale data processing system enabling developers to uniformly execute code in either batch-mode (Hadoop/MapReduce-based) or stream-mode (Storm-based) or a combination thereof, called hybrid mode.

In order for Twitter to be able to keep up processing 500 millions tweets and growing, they had to find a replacement for their existing stack that required manually integrating MapReduce (Pig/Scalding) and streaming-based (Storm) code. The main motivation to create Summingbird,mentioned by the Twitter engineers, came from the realization that running a fully real-time system on Storm was difficult due to:

  • Re-computation over months of historical logs must be coordinated with Hadoop or streamed through Storm with a custom log-loading mechanism.
  • Storm is focused on message passing and random-write databases are harder to maintain.

This insight led to Summingbird, a flexible and general solution addressing the engineers’ practical issues with the existing approach:

  • Two sets of aggregation logic have to be kept in sync in two different systems
  • Keys and values must be serialized consistently between each system and the client
  • The client is responsible for reading from both datastores, performing a final aggregation and serving the combined results

Summingbird is also one of the first openly available Lambda Architecture compliant systems. Similar projects include Yahoo’s Storm-YARN and a Spanish start-up’s upcoming Lambdoop, a Java framework for developing Big Data applications in a Lambda Architecture conformant way. The characteristics of the Lambda Architecture – immutable master dataset and the combination of batch, serving, and speed layer – enables people to build robust large-scale data processing systems that can deal with both batch and stream processing and has use cases from social media platforms (such as Twitter, LinkedIn, etc.) over the Internet of Things (smart city, wearables, manufacturing, etc.) to the financial sector (fraud detection, recommendations).

The main authors of Summingbird – Oscar Boykin, Sam Ritchie (nephew of computer science legend Dennis Ritchie) and Ashutosh Singhal – have further revealed the roadmap for Summingbird:

  • support for Apache Spark and the columnar data storage format Parquet
  • libraries of higher-level mathematics and machine learning code on top of Summingbird’s Producer primitives, and
  • deeper integration with related open source projects such as Algebird or Storehaus.

Source: InfoQ.com

Hadoop YARN

Apache™ Hadoop® YARN is a sub-project of Hadoop at the Apache Software Foundation introduced in Hadoop 2.0 that separates the resource management and processing components. YARN was born of a need to enable a broader array of interaction patterns for data stored in HDFS beyond MapReduce. The YARN-based architecture of Hadoop 2.0 provides a more general processing platform that is not constrained to MapReduce.

YARN is the future of Hadoop

As part of Hadoop 2.0, YARN takes the resource management capabilities that were in MapReduce and packages them so they can be used by new engines.  This also streamlines MapReduce to do what it does best, process data.  With YARN, you can now run multiple applications in Hadoop, all sharing a common resource management.  Many organizations are already building applications on YARN in order to bring them IN to Hadoop.

YARN

When enterprise data is made available in HDFS, it is important to have multiple ways to process that data.  With Hadoop 2.0 and YARN organizations can use Hadoop for streaming, interactive and a world of other Hadoop based applications.

What YARN Does

YARN enhances the power of a Hadoop compute cluster in the following ways:

  • ScalabilityThe processing power in data centers continues to grow quickly. Because YARN ResourceManager focuses exclusively on scheduling, it can manage those larger clusters much more easily.
  • Compatibility with MapReduceExisting MapReduce applications and users can run on top of YARN without disruption to their existing processes.
  • Improved cluster utilization.The ResourceManager is a pure scheduler that optimizes cluster utilization according to criteria such as capacity guarantees, fairness, and SLAs. Also, unlike before, there are no named map and reduce slots, which helps to better utilize cluster resources.
  • Support for workloads other than MapReduceAdditional programming models such as graph processing and iterative modeling are now possible for data processing. These added models allow enterprises to realize near real-time processing and increased ROI on their Hadoop investments.
  • AgilityWith MapReduce becoming a user-land library, it can evolve independently of the underlying resource manager layer and in a much more agile manner.

How YARN Works

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker/TaskTracker into separate entities:

  • a global ResourceManager
  • a per-application ApplicationMaster.
  • a per-node slave NodeManager and
  • a per-application Container running on a NodeManager

The ResourceManager and the NodeManager form the new, and generic, system for managing applications in a distributed manner. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The per-application ApplicationMaster is a framework-specific entity and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the component tasks. The ResourceManager has a scheduler, which is responsible for allocating resources to the various running applications, according to constraints such as queue capacities, user-limits etc. The scheduler performs its scheduling function based on the resource requirements of the applications. The NodeManager is the per-machine slave, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager. Each ApplicationMaster has the responsibility of negotiating appropriate resource containers from the scheduler, tracking their status, and monitoring their progress. From the system perspective, the ApplicationMaster runs as a normal container.   For more technical information on YARN, you may enjoy our series of blog posts on the topic :

By HortonWorks.com

Hadoop Summit Reflections

We’re finally catching our breath after a phenomenal Hadoop Summit event last week in San Jose.  Thank you to everyone that came to participate in the celebration of Hadoop advances and adoption—from many of the organizations that shared their Hadoop journey with us that fundamentally transformed their businesses, to those just getting started, to the huge ecosystem of vendors. It is amazing to be part of such a broad and deep community that is contributing to making the market for everyone.

If you didn’t have the opportunity to see them live, the keynotes have already been posted here and we’ll be posting the videos from all of the individual sessions as soon as we get them back.

In the meantime, I wanted to share some quick and simple reflections from the event.

Momentum, Momentum, Momentum

Based on the attendance —3,100+ individuals from 1,000 unique organizations— it was a special community conference and the buzz at the event made it clear that adoption is accelerating.  Vendors? Yup: 88 unique event sponsors, including Platinum Sponsors AT&T, Microsoft, SAP and Teradata—a sign that Hadoop is here to stay as a core component of the Modern Data Architecture.

And if the impromptu job board that popped up at the event is any indication, things are going to pick up pace in the coming months!

billboard

YARN Enables a Modern Data Architecture

An Apache Hadoop platform has always referred to a collection of Apache Projects, and YARN—the key addition in Hadoop 2.x —has clearly become the architectural center of Hadoop.

In many respects, Hadoop 2 has caused the inflection point in its adoption. Users who had deployed “traditional” Hadoop to underpin a particular application can now add applications 2, 3 and 4 on that same cluster—spanning batch, real-time and interactive use cases that are now enabled by YARN.  This was certainly clear from the user panel on Day 3 entitled Hadoop in the Enterprise with representatives from AIG, BNY Mellon, British Gas / Centrica, Kohls, Rogers, Target and TrueCar —all of whom are well down the path of their Hadoop 2 journey.

Not only does YARN introduce the ability to run multiple applications on a common infrastructure, it also provides the common operating system to enable a consistent approach to plug in the core requirements of operations, data access, data management, security and governance.

In the manner that UNIX liberated us from a single-process, single-user operating system to multi-user, multi-process operating system, so does YARN liberate us from single-mode, single-load to multi-load, multi-apps data operating system.

In case you missed it, in order to simplify the on-boarding of components to run in Hadoop, we introduced a number of tools for ISV partners to get started with YARN.

YARN-Services-APIs

And to ensure that the applications our customers use are deeply integrated, we introduced the YARN-Ready program: YARN Ready applications have been certified by Hortonworks to work with YARN—meaning that they can be safely deployed on a common cluster.

The Blueprint for Enterprise Hadoop

As adoption has accelerated, an even greater expectation has been placed on Security, Data Access & Data Management, Operations and Governance within the core platform.  And the good news is that through both community and vendor contributions these capabilities have been added at a phenomenal pace and will make life even simpler for mainstream users.

We think this “Blueprint for Enterprise Hadoop” represents the core capabilities that are expected of any data platform, and in the case of the Hadoop ecosystem, they are being uniquely delivered in open source.

blueprint

What’s Next?

A fascinating week: if you weren’t there, stay tuned for the videos that will be posted in the coming weeks.  And if you were: thank you for participating with us and hope to see you again next year!

By: HortonWorks.com

Unified Security for the Hadoop Data Operating System across Workloads

Enterprises are using Apache Hadoop powered by YARN as a Data Operating System to run multiple workloads and use cases instead of using it just as a single purpose cluster.

A multi-purpose enterprise wide data platform often referred to as a data lake gives rise to the need for a comprehensive approach to security across the Hadoop platform and the workloads. Few weeks back Hortonworks acquired XA Secure to further execute on our vision to bring a holistic security framework to the Hadoop community irrespective of the workload.

As a result of the XA Secure acquisition, HDP Security now offers some additional capabilities:

Centralized Security Administration

Users can now easily manage all the security policies related to access control in one place.

Fine Grained Access Control

Administrators can use a “Single pane of glass” with an easy interface to configure entitlement policies for HDFS, HBase and Hive. We support fine grain access control, down to column level in HBase and Hive, and file level in HDFS.

Auditing

Audit data provides accountability and control over the Hadoop platform. We now provide a centralized audit reporting capability and the ability to get detailed audit data from HDFS, HBase and Hive. The audit data is further broken down into access audit, policy audit and agent audit data, giving a granular visibility to auditors on users’ access as well as administrative actions within the HDP Security Portal.

Last week at Hadoop Summit, we published a tutorial and a new Hortonworks Sandbox with HDP Security for you to experience these new capabilities.

Source: hortonworks.com

Learn More

  1. Download the new Sandbox
  2. Work through the Tutorial

Using Spark to Ignite Data Analytics

At eBay we want our customers to have the best experience possible. We use data analytics to improve user experiences, provide relevant offers, optimize performance, and create many, many other kinds of value. One way eBay supports this value creation is by utilizing data processing frameworks that enable, accelerate, or simplify data analytics. One such framework is Apache Spark. This post describes how Apache Spark fits into eBay’s Analytic Data Infrastructure.

spark_logo

What is Apache Spark?

The Apache Spark web site describes Spark as “a fast and general engine for large-scale data processing.” Spark is a framework that enables parallel, distributed data processing. It offers a simple programming abstraction that provides powerful cache and persistence capabilities. The Spark framework can be deployed through Apache Mesos, Apache Hadoop via Yarn, or Spark’s own cluster manager. Developers can use the Spark framework via several programming languages including Java, Scala, and Python. Spark also serves as a foundation for additional data processing frameworks such as Shark, which provides SQL functionality for Hadoop.

Spark is an excellent tool for iterative processing of large datasets. One way Spark is suited for this type of processing is through its Resilient Distributed Dataset (RDD). In the paper titledResilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, RDDs are described as “…fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.” By using RDDs,  programmers can pin their large data sets to memory, thereby supporting high-performance, iterative processing. Compared to reading a large data set from disk for every processing iteration, the in-memory solution is obviously much faster.

The diagram below shows a simple example of using Spark to read input data from HDFS, perform a series of iterative operations against that data using RDDs, and write the subsequent output back to HDFS.

spark_example_diagram

In the case of the first map operation into RDD(1), not all of the data could fit within the memory space allowed for RDDs. In such a case, the programmer is able to specify what should happen to the data that doesn’t fit. The options include spilling the computed data to disk and recreating it upon read. We can see in this example how each processing iteration is able to leverage memory for the reading and writing of its data. This method of leveraging memory is likely to be 100X faster than other methods that rely purely on disk storage for intermittent results.

Apache Spark at eBay

Today Spark is most commonly leveraged at eBay through Hadoop via Yarn. Yarn manages the Hadoop cluster’s resources and allows Hadoop to extend beyond traditional map and reduce jobs by employing Yarn containers to run generic tasks. Through the Hadoop Yarn framework, eBay’s Spark users are able to leverage clusters approaching the range of 2000 nodes, 100TB of RAM, and 20,000 cores.

The following example illustrates Spark on Hadoop via Yarn.

spark_hadoop_diagram

The user submits the Spark job to Hadoop. The Spark application master starts within a single Yarn container, then begins working with the Yarn resource manager to spawn Spark executors – as many as the user requested. These Spark executors will run the Spark application using the specified amount of memory and number of CPU cores. In this case, the Spark application is able to read and write to the cluster’s data residing in HDFS. This model of running Spark on Hadoop illustrates Hadoop’s growing ability to provide a singular, foundational platform for data processing over shared data.

The eBay analyst community includes a strong contingent of Scala users. Accordingly, many of eBay’s Spark users are writing their jobs in Scala. These jobs are supporting discovery through interrogation of complex data, data modelling, and data scoring, among other use cases. Below is a code snippet from a Spark Scala application. This application uses Spark’s machine learning library, MLlib, to cluster eBay’s sellers via KMeans. The seller attribute data is stored in HDFS.

/**
 * read input files and turn into usable records
 */
 var table = new SellerMetric()
 val model_data = sc.sequenceFile[Text,Text](
   input_path
  ,classOf[Text]
  ,classOf[Text]
  ,num_tasks.toInt
 ).map(
   v => parseRecord(v._2,table)
 ).filter(
   v => v != null
 ).cache

....

/**
 * build training data set from sample and summary data
 */
 val train_data = sample_data.map( v =>
   Array.tabulate[Double](field_cnt)(
     i => zscore(v._2(i),sample_mean(i),sample_stddev(i))
   )
 ).cache

/**
 * train the model
 */ 
 val model = KMeans.train(train_data,CLUSTERS,ITERATIONS)
  
/**
 * score the data
 */
 val results = grouped_model_data.map( 
   v => (
     v._1
    ,model.predict(
       Array.tabulate[Double](field_cnt)(
         i => zscore(v._2(i),sample_mean(i),sample_stddev(i))
       )
     )
   )
 ) 
 results.saveAsTextFile(output_path)

In addition to  Spark Scala users, several folks at eBay have begun using Spark with Shark to accelerate their Hadoop SQL performance. Many of these Shark queries are easily running 5X faster than their Hive counterparts. While Spark at eBay is still in its early stages, usage is in the midst of expanding from experimental to everyday as the number of Spark users at eBay continues to accelerate.

The Future of Spark at eBay

Spark is helping eBay create value from its data, and so the future is bright for Spark at eBay. Our Hadoop platform team has started gearing up to formally support Spark on Hadoop. Additionally, we’re keeping our eyes on how Hadoop continues to evolve in its support for frameworks like Spark, how the community is able to use Spark to create value from data, and how companies like Hortonworks and Cloudera are incorporating Spark into their portfolios. Some groups within eBay are looking at spinning up their own Spark clusters outside of Hadoop. These clusters would either leverage more specialized hardware or be application-specific. Other folks are working on incorporating eBay’s already strong data platform language extensions into the Spark model to make it even easier to leverage eBay’s data within Spark. In the meantime, we will continue to see adoption of Spark increase at eBay. This adoption will be driven by chats in the hall, newsletter blurbs, product announcements, industry chatter, and Spark’s own strengths and capabilities.

Source: ebaytechblog.com

How to Analyze Server Logs with Cascading and Apache Hadoop on Hortonworks Data Platform

This is the second in the series of blogs exploring how to write data-driven applications in Java using the Cascading SDK. The series are:

  1. WordCount
  2. Log Parsing

Historically, programming languages and software frameworks have evolved in a singular direction, with a singular purpose: to achieve simplicity, hide complexity, improve developer productivity, and make coding easier. And in the process, foster innovation to the degree we have seen today—and benefited from.

Anyone among you is “young” enough to admit writing code in microcode and assembly language?

Yours truly wrote his first lines of “Hello World” in assembly language on the VAX and PDP 11, the same computer hardware (and software) that facilitated the genesis of “C” and “UNIX” at the Bell Labs. Indisputably, we have come a long way from microcode to assembly, from to C to Java, which has facilitated writing high-level abstraction frameworks and enabled innovative technologies, such as J2EE web services frameworks and Apache Hadoop and MapReduce computing frameworks, to mention a few.

Add to that long list Cascading Java SDK for writing data-driven applications for the Apache Hadoop running on theHortonworks Data Platform (HDP). And even more, the Cascading 3.0 will support Apache Tez, the high-performance parallel execution MapReduce engine.

Data Flow and Data Pipeline

In the previous blog, I explored the genesis of Java Cascading Framework. I argued that at the core of any data-driven application, there exists a pattern of data transformation that mimics aspects of Extract, Transform, and Load operations (ETL).

Screen Shot 2014-04-17 at 10.03.26 PM

I showed how Cascading framework embraces those common ETL operations by providing high-level logical building blocks as Java composite classes, allowing a developer to write data-driven apps without resorting to or knowing about the MapReduce Java API or having the know-how of underlying Apache Hadoop infrastructure complexity.

Parsing Logs with MapReduce

In this blog we examine a common usage of reading, parsing, transforming, sorting, storing, and extracting data value from a large server blog. The value extracted is the list of top-ten in-bound IP addresses. For this example, we’ve curtailed one server log to 160 MB. In reality, these could be weeks’ of servers logs, with gigabytes of data.

Screen Shot 2014-05-14 at 6.05.38 PM

Keeping the above flow in mind, we can write a very simple Java MapReduce program, without writing to the Java MapReduce API or without knowledge of the underlying Apache Hadoop complexity. For example, below is a complete source listing of the above transformation—in less than 40 lines of code: that’s simple!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.filter.Sample;
import cascading.operation.filter.Limit;
import cascading.operation.regex.RegexParser;
import cascading.operation.text.DateParser;
import cascading.pipe.*;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import java.util.Properties;
 
public class Main {
    public static void main(String[] args) {
 
    	// input (taps) and output (sinks)
        String inputPath 		= args[0];
        String outputPath 	= args[1];
        // sources and sinks
        Tap inTap 	= new Hfs(new TextLine(), inputPath);
        Tap outTap  = new Hfs(new TextDelimited(true, "t"), outputPath, SinkMode.REPLACE);
        // Parse the line of input and break them into five fields
        RegexParser parser = new RegexParser(new Fields("ip", "time", "request", "response", "size"), 
        		"^([^ ]*) \S+ \S+ \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([^ ]*).*$", new int[]{1, 2, 3, 4, 5});
        // Create a pipe for processing each line at a time
        Pipe processPipe = new Each("processPipe", new Fields("line"), parser, Fields.RESULTS);
        // Group the stream within the pipe by the field "ip"
        processPipe = new GroupBy(processPipe, new Fields("ip"));
        // Aggregate each "ip" group using the Cascading built in Count function
        processPipe = new Every(processPipe, Fields.GROUP, new Count(new Fields("IPcount")), Fields.ALL);
        // After aggregation counter for each "ip," sort the counts
        Pipe sortedCountByIpPipe = new GroupBy(processPipe, new Fields("IPcount"), true);
        // Limit them to the first 10, in the descending order
        sortedCountByIpPipe = new Each(sortedCountByIpPipe, new Fields("IPcount"), new Limit(10));
        // Join the pipe together in the flow, creating inputs and outputs (taps)
        FlowDef flowDef = FlowDef.flowDef()
    		   .addSource(processPipe, inTap)
    		   .addTailSink(sortedCountByIpPipe, outTap)
    		   .setName("DataProcessing");
        Properties properties = AppProps.appProps()
        		.setName("DataProcessing")
        		.buildProperties();
        Flow parsedLogFlow = new HadoopFlowConnector(properties).connect(flowDef);
        //Finally, execute the flow.
        parsedLogFlow.complete();
    }
}

Code Walk

First, we create taps (sources and sinks) using the HFS() constructor, followed by how we want each line to be split, using RegexParser()into five fields. Second, we create a series of Pipes() to process each line, aggregate orGroupBy() IPs, sort the IP’s count, and limit the count to top ten IP addresses. Third, we connect the pipes into aFlowDefFlow() and HadoopFlowConnector(), with pipes, input taps, and output taps. And finally, we execute the flow with Flow.complete().

So in four easy programming steps we created a MapReduce program without a single reference to the MapReduce API.  Instead, we used only high-level logical constructs and classes in Cascading SDK.

The compiled jar file and the run on the HDP 2.1 Sandbox resulted in the following output. Note the number of MapReduce jobs and the final submission to YARN.

Screen Shot 2014-05-13 at 6.01.55 PM

What Next?

To dabble your feet and whet your appetite, you can try out some Cascading Impatient Series.

By Jules S. Damji, HortonWorks.com

Apache Spark: The Next Big Data Thing?

Apache Spark is generating quite some buzz right now. Databricks, the company founded to support Spark raised $14M from Andreessen HorowitzCloudera has decided to fully support Spark, and others chime in that it’s the next big thing. So I thought it’s high time I took a look to get an understanding what the whole buzz is around.

I played around with the Scala API (Spark is written in Scala), and to be honest, at first I was pretty underwhelmed, because Spark looked, well, so small. The basic abstraction are Resilient Distributed Datasets (RDDs), basically distributed immutable collections, which can be defined based on local files or files stored in on Hadoop via HDFS, and which provide the usual Scala-style collection operations like map, foreach and so on.

My first reaction was “wait, is this basically distributed collections?” Hadoop in comparison seemed to be so much more, a distributed filesystem, obviously map reduce, with support for all kinds of data formats, data sources, unit testing, clustering variants, and so on and so on.

Others quickly pointed out that there’s more to it, in fact, Spark also provides more complex operations like joins, group-by, or reduce-by operations so that you can model quite complex data flows (without iterations, though).

Over time it dawned on me that the perceived simplicity of Spark actually said a lot more about the Java API of Hadoop than Spark. Even simple examples in Hadoop usually come with a lot of boilerplate code. But conceptually speaking, Hadoop is quite simple as it only provides two basic operations, a parallel map, and a reduce operation. If expressed in the same way on something resembling distributed collections, one would in fact have an even smaller interface (some projects like Scalding actually build such things and the code looks pretty similar to that of Spark).

So after convincing me that Spark actually provides a non-trivial set of operations (really hard to tell just from the ubiqitous word count example), I digged deeper and read this paper which describes the general architecture. RDDs are the basic building block of Spark and are actually really something like distributed immutable collections. These define operations like map or foreach which are easily parallelized, but also join operations which take two RDDs and collects entries based on a common key, as well as reduce-by operations which aggregates entries using a user specified function based on a given key. In the word count example, you’d map a text to all the words with a count of one, and then reduce them by key using the word and summing up the counts to get the word counts. RDDs can be read from disk but are then held in memory for improved speed where they can also be cached so you don’t have to reread them every time. That alone adds a lot of speed compared to Hadoop which is mostly disk based.

Now what’s interesting is Spark’s approach to fault tolerance. Instead of persisting or checkpointing intermediate results, Spark remembers the sequence of operations which led to a certain data set. So when a node fails, Spark reconstructs the data set based on the stored information. They argue that this is actually not that bad because the other nodes will help in the reconstruction.

So in essence, compared to bare Hadoop, Spark has a smaller interface (which might still become similarly bloated in the future), but there are many projects on top of Hadoop (like Twitter’s Scalding, for example), which achieve a similar level of expressiveness. The other main difference is that Spark is in-memory by default, which naturally leads to a large improvement in performance, and even allows to run iterative algorithms. Spark has no built- in support for iterations, though, it’s just that they claim it’s so fast that you can run iterations if you want to.

Spark Streaming – return of the micro-batch

Spark also comes with a streaming data processing model, which got me quite interested, of course. There is again a paper which summarizes the design quite nicely. Spark follows an interesting and different approach compared to frameworks like Twitter’s Storm. Storm is basically like a pipeline where you push individual events in which then get processed in a distributed fashion. Instead, Spark follows a model where events are collected and then processed at short time intervals (let’s say every 5 seconds) in a batch manner. The collected data become an RDD of their own which is then processed using the usual set of Spark applications.

The authors claim that this mode is more robust against slow nodes and failures, and also that the 5 second interval are usually fast enough for most applications. I’m not so sure about this, as distributed computing is always pretty complex and I don’t think you can easily say that something’s are generally better than others. This approach also nicely unifies the streaming with the non- streaming parts, which is certainly true.

Final thoughts

What I saw looked pretty promising, and given the support and attention Spark receives, I’m pretty sure it will mature and become a strong player in the field. It’s not well-suited for everything. As the authors themselves admit, it’s not really well suited to operations which require to change only a few entries the data set at the time due to the immutable nature of the RDDs. In principle, you have to make a copy of the whole data set even if you just want to change one entry. This can be nicely paralellized, but is of course costly. More efficient implementations based on copy-on-write schemes might also work here, but are not implement yet if I’m not mistaken.

Stratosphere is research project at the TU Berlin which has similar goals, but takes the approach even further by including more complex operations like iterations and not only storing the sequence of operations for fault tolerance, but to use them for global optimization of the scheduling and paralellization.

Immutability is pretty on vogue here as it’s easier to reason about, but I’d like to point you to this excellent articleby Baron Schwartz on how you’ll always end up with mixed strategies (mutable and immutable data) to make it work in the real-world.

By Mikio L. Braun, mikiobraun.de

Ambari Blueprints Provision Clusters with Greater Speed and Ease

Apache Ambari has always provided an operator the ability to provision an Apache Hadoop cluster using an intuitive Cluster Install Wizard web interface, guiding the user through a series of steps:

  • confirming the list of hosts
  • assigning master, slave, and client components to configuring services, and
  • installing, starting and testing the cluster.

With Ambari Blueprints, system administrators and dev-ops engineers can expedite the process of provisioning a cluster. Once defined, Blueprints can be re-used, which facilitates easy configuration and automation for each successive cluster creation.

Best Practices, From Experience

Hortonworks has worked with countless enterprise customers and partners deploying Hadoop within their data centers. From this experience we distilled many best practices for how to distribute the Hadoop components and stand up clusters.

The number of Hadoop deployments continues to grow within the enterprise. These clusters come in many types: production, development, test and discovery. Some clusters are permanent and some are ad hoc, launched for a short period of time for a specific task.

This variability means that installing a cluster through a web interface can be a time-consuming task for an administrator to make sure each cluster is configured correctly for the intended use case. Combine that with the Apache Hadoop ecosystem’s rapid innovation and expansion, and there is greater and greater demand for a higher level of consistent and automated cluster provisioning.

Ambari Blueprints Make Cluster Provisioning Easier & Repeatable

The Ambari Blueprints feature in recently released Apache Ambari 1.6.0 is a significant step forward. Blueprints facilitate instantiation of a Hadoop cluster quickly and without requiring user interactions. And because Blueprints contain knowledge around service component layout for a particular Stack definition, they preserve best practices across different environments. Blueprints ensure that those best practices for service component layout and configuration are consistently applied across clusters in multiple environments (dev, test, prod) and multiple data centers.
amb3

How Blueprints Work

A blueprint document is in JSON format as shown below. It defines the service components to install on a given host group, the configurations to use, and which Stack to use.

Figure 1 Blueprint Structure 

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"configurations" : [
    {
      "configuration-type" : {
          "property-name"  : "property-value",
          "property-name2" : "property-value"
      }
    },
    {
      "configuration-type2" : {
          "property-name" : "property-value"
      }
    }
    ...
  ],
  "host_groups" : [
    {
      "name" : "host-group-name",
      "components" : [
        {
          "name" : "component-name"
        },
        {
          "name" : "component-name2"
        }
        ...
      ],
      "configurations" : [
        {
          "configuration-type" : {
            "property-name" : "property-value"
          }
        }
        ...
      ],
      "cardinality" : "1"
    }
  ],
  "Blueprints" : {
    "stack_name" : "HDP",
    "stack_version" : "2.1"
  }
}

To create a new cluster, an Ambari Blueprint must be combined with environment-specific host information and configuration within a cluster creation template. A cluster creation template is also in JSON format as shown below. It defines the specific blueprint to follow, the hosts that are to be mapped to a given host group, and the configurations to use.

Figure 2 Cluster Creation Template Structure 

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
  "blueprint" : "blueprint-name",
  "default_password" : "super-secret-password",
  "configurations" : [
    {
      "configuration-type" : {
        "property-name" : "property-value"
      }
    }
    ...
  ],
  "host_groups" :[
    {
      "name" : "host-group-name",
      "configurations" : [
        {
          "configuration-type" : {
            "property-name" : "property-value"
          }
        }
      ],
      "hosts" : [
        {
          "fqdn" : "host.domain.com"
        },
        {
          "fqdn" : "host2.domain.com"
        }
        ...
      ]
    }
    ...
  ]
}

Configurations defined in a cluster creation template will override any duplicate configurations specified at a blueprint level when the cluster is created. Not all configurations have valid defaults. Therefore, the Blueprint user must provide the required properties. For example, Ambari validates non-password required properties at blueprint creation time and required password properties at cluster creation time.

Monitor Cluster Progress and Inspection via Blueprint API

Once you have those blueprint assets, then it’s time to call the API. Ambari will return a request href for you to “watch” the progress of all the installing and configuration tasks.

Figure 3 Blueprint API Calls
rest_api

Ambari Server provides the following API resources for managing blueprints and creating clusters from them.

Figure 4 Basic API Resources

Screen Shot 2014-05-30 at 3.48.16 PM

Ambari Blueprints address the need to codify and enforce best practices when deploying Hadoop clusters within the enterprise. They store environment specific information and configuration separately and can easily manage consistent deployment across clusters of all sizes. Finally, the API also removes the administrator from the provisioning process and fully automates the steps involved with creating a cluster.

Three Steps to Get Started

  1. Grab the Ambari 1.6.0 release
  2. Dig into the Ambari Blueprints wiki
  3. Start building new clusters!

By HortonWorks.com

Spark, Storm and Real Time Analytics

Big Data Analytics have been advancing in the past years as the amount of information has exploded. Hadoop is definitely the platform of choice for Big Data analysis and computation. While data Volume, Variety and Velocity increases, Hadoop as a batch processing framework cannot cope with the requirement for real time analytics.

Databricks, the company behind Apache Spark recently raised $14 million to accelerate development of Spark and Shark. Spark is an engine for large-scale data processing written inScala, while Shark is a Hive compatible variation of Spark.

Like Spark, Storm also aims to come around Hadoop’s batch nature by providing event processing and distributed computation capabilities. By designing a topology of transformations in a Directed Acyclic Graph, the architect can perform arbitrarily complex computations, one transformation at a time.

Nathan Marz experienced it first hand and came up with the lambda architecture paradigm to solve this fundamental architectural problem. Lambda architecture consists of a serving layer that gets updated infrequently from the batch layer and a speed layer that computes real time analytics to compensate for the slow batch layer. Essentially, Hadoop is computing analytics in batches and in between batch runs, the speed layer is incrementally updating metrics by examining events in a streaming fashion.

Both Spark and Storm can operate in a Hadoop cluster and access Hadoop storage. Storm-YARN is Yahoo’s open source implementation of Storm and Hadoop convergence. Spark is providing native integration for Hadoop. Integration with Hadoop is achieved through YARN (NextGen MapReduce). Integrating real time analytics with Hadoop based systems allows for better utilization of cluster resources through computational elasticity and being in the same cluster means that network transfers can be minimal.

In terms of commercial support, Cloudera has already announced support for Spark andincluded it in CDH (Cloudera’s Distribution Including Apache Hadoop). Hortonworks is planning to include Apache Storm in HDP (Hortonworks Data Platform) in the first half of 2014.