Tag Archives: Big Data

Learn how Map Reduce Algorithm works with a simple example.. (Tutorial Day 7)

MapReduce Algorithm

MapReduce is a Distributed Data Processing Algorithm, introduced by Google.It is mainly inspired by Functional Programming model. MapReduce algorithm is mainly useful to process huge amount of data in parallel, reliable and efficient way in cluster environments. Its uses the technique “Divide and Conquer” algorithm to process large amount of data.It divides input task into smaller and manageable sub-tasks to execute them in-parallel.

MapReduce Algorithm Steps

MapReduce Algorithm uses the following three main steps:

Map Function : Map Function is the first step in MapReduce Algorithm. It takes input tasks (say DataSets. I have given only one DataSet in below diagram.) and divides them into smaller sub-tasks. Then perform required computation on each sub-task in parallel.

This step performs the following two sub-steps:

  • Splitting :Splitting step takes input DataSet from Source and divide into smaller Sub-DataSets.
  • Mapping :Mapping step takes those smaller Sub-DataSets and perform required action or computation on each Sub-DataSet.

The output of this Map Function is a set of key and value pairs as <Key, Value> as shown in the below diagram.

Sort & Shuffle Function :

It is the second step in MapReduce Algorithm. Shuffle Function is also know as “Combine Function”. It takes a list of outputs coming from “Map Function” and perform these two following sub-steps on each and every key-value pair:

  1. Merging
  2. Sorting
  • Merging step combines all key-value pairs which have same keys (that is grouping key-value pairs by comparing “Key”). This step returns <Key, List<Value>>.
  • Sorting step takes input from Merging step and sort all key-value pairs by using Keys. This step also returns <Key, List<Value>> output but with sorted key-value pairs.

 

Reduce Function :It is the final step in MapReduce Algorithm. It performs only one step : Reduce step. It takes list of <Key, List<Value>> sorted pairs from Shuffle Function and perform reduce operation as shown below.

Final step output looks like first step output. However final step <Key, Value> pairs are different than first step <Key, Value> pairs. Final step <Key, Value> pairs are computed and sorted pairs.

We can observe the difference between first step output and final step output with some simple example.

Example 1: Word Counting

Problem Statement:
Count the number of occurrences of each word available in a DataSet. So in below example, count how many times Fruits appear and what is the count ?

Input DataSet
Please find below example Input DataSet file. This is small set of data, but in real-time applications , they use very huge amount of Data.

i1

Output: Counts of each fruit.

How it works:

Step1: MapReduce – Map Function (Split Step)

i2

Step2: MapReduce – Map Function (Mapping Step)

i3

Step 3: MapReduce – Sort & Shuffle Function

i4

Step4: Reduce Function

i5

So we can accumulate our learning from this example as :

  1. The input dataset can be divided into n number of chunks depending upon the amount of data and block size.
  2. In Map function, all the chunks are processed simultaneously at the same time, which embraces the parallel processing of data.
  3. In shuffling, happens which leads to aggregation of similar patterns.
  4. Finally, reducers combine them all to get a consolidated output as per the logic.

 

Now in next tutorial we will discuss what is Scope ?

Advertisements

Spark Components..Architecture (Day 2)

So here we are today…in Day 2  tutorial for Spark learning. As we all know, that Spark is a top-level project of the Apache Software Foundation, designed to be used with a range of programming languages and on a variety of architectures. Spark’s speed, simplicity, and broad support for existing development environments and storage systems make it increasingly popular with a wide range of developers, and relatively accessible to those learning to work with it for the first time.

To learn Spark easily and incorporate into existing applications as straightforwardly as possible., its developed to support many programming languages like Java, Python, Scala, SQL & R. Spark is easy to download and install on a laptop or virtual machine. Spark was built to be able to run in a couple different ways: standalone, or part of a cluster.For production workloads that are operating at scale, Spark will require to run on an big data cluster. These clusters are often also used for Hadoop jobs, and Hadoop’s YARN resource manager will generally be used to manage that Hadoop cluster (including Spark).  Spark can also run just as easily on clusters controlled by Apache Mesos.A series of scripts bundled with current releases of Spark simplify the process of launching Spark on Amazon Web Services’ Elastic Compute Cloud (EC2).

(source:internet)
Spark Architecture

The Spark architecture or stack currently is comprised of Spark Core and four libraries that are optimized to address the requirements of four different use cases.Individual applications will typically require Spark Core and at least one of these libraries.

What are Spark Components?

Spark core: Its is a general-purpose system providing basic functionality like task scheduling, distributing,fault recovery, interacting with storage systems and monitoring of the applications across a cluster. Spark Core is also home to the API that defines resilient distributed datasets (RDDs), which is Spark’s main programming abstraction.

Then you have the components on top of the core that are designed to interoperate closely.Benefit of such a stack is that all the higher layer components will inherit the improvements made at the lower layers. Example: Optimization to the Spark Core will speed up the SQL, the streaming, the machine learning and the graph processing libraries as well.

  1. Spark Streaming : This module enables scalable and fault-tolerant processing of streaming data, and can integrate with established sources of data streams like Flume. Examples of data streams include log files generated by production web servers, or queues of messages containing status updates posted by users of a web service.
  2. Spark SQL: This module is for working with structured data. It allows querying data via SQL as well as the Apache Hive variant of SQL—called the Hive Query Language (HQL)—and it supports many sources of data, including Hive tables, Parquet, and JSON.Spark SQL also supports JDBC and ODBC connections, enabling a degree of integration with existing databases, data warehouses and business intelligence tools.
  3. GRaphX : It supports analysis of and computation over graphs of data (e.g., a social network’s friend graph) and performing graph-parallel computations. Like Spark Streaming and Spark SQL, it also extends the Spark RDD API, allowing us to create a directed graph with arbitrary properties attached to each vertex and edge. It provides various operators for manipulating graphs (e.g., subgraph and mapVertices) and a library of common graph algorithms (e.g., PageRank and triangle counting).
  4. Spark Mlib : Spark comes with a library containing common machine learning (ML) functionality, called MLlib. It provides multiple types of machine learning algorithms, including classification, regression, clustering, and collaborative filtering, as well as supporting functionality such as model evaluation and data import.

What is Resilient Distributed Datasets (RDDs)? Click here to learn Day 3 tutorial 🙂

Learn Sqoop..!! (tutorial Day 8)

Being a part of Hadoop ecosystem, Scoop is an important interaction tool. When Big Data storages and analyzers such as MapReduce, Hive, HBase, Cassandra, Pig, etc. of the Hadoop ecosystem came into picture, they required a tool to interact with the relational database servers for importing and exporting the Big Data residing in them. Here, Sqoop provides feasible interaction between relational database server and Hadoop’s HDFS.

What is Scoop?

Sqoop is a open source tool designed to transfer data between Hadoop (Hive/HDFS or Hbase) and relational database servers. It is used to import data from structured Data stores or relational databases such as MySQL, Oracle to Hadoop related eco-systems like Hive or HDFS or HBase, and export from Hadoop file system back to relational databases, enterprise data warehouses. Sqoop only works with structured & relational databases such as Teradata, Netezza, Oracle, MySQL, Postgres etc.IF your DB doesn’t lie in this category, we can still use Scoop, using Extension framework – Connectors. You can find connectors online, and modify there code or write your own code using framework. Generally, JDBC connectivity comes handy with maximum of databases. so this resolves your issue.

Sqoop: “SQL to Hadoop and Hadoop to SQL”

Why is Sqoop used?

Scoop doesn’t have any server,so it is a client library. So it doesn’t matter you run it from Data node or from anywhere. It will find the instalation locally or  you can define the hadoop installation and then it will find the name node and run from there.

Sqoop uses MapReduce framework to import and export the data, which provides parallel mechanism as well as fault tolerance. Sqoop makes developers life easy by providing command line interface. Developers just need to provide basic information like source, destination and database authentication details in the sqoop command. Sqoop takes care of remaining part.

Sqoop provides many salient features like:

  1. Full Load
  2. Incremental Load
  3. Parallel import/export
  4. Import results of SQL query
  5. Connectors for all major RDBMS Databases
  6. Kerberos Security Integration
  7. Load data directly into Hive/Hbase

What are Connectors?

Scoop has connectors, which is a pluggable component that uses extension framework to enable scoop to import or export the data between Hadoop and Data stores. The most basic connector that ships with Sqoop is Generic JDBC Connector, and as the name suggests, it uses only the JDBC interface for accessing metadata and transferring dataAvailable connectors include Oracle, DB2, MYSql, PostgresSQL, Teradata, JDBC.

Scoop Architecture

 

How is Sqoop used?What all can we import/export ??

Scoop can be used to import/export :

  1. Entire table
  2. Part of table or just data using Where clause
  3. all tables of a Database

or we can use Scoop’s few commands like Eval (Evaluate), Options-File (convert your file command into Scoop commands) , all-Databases, all-tables and many more.

Scoop reads the table row by row into HDFS. The output of this Import table process is set of files containing copy of imported table. Since import is a parallel process, hence output will be many files.These files may be delimited text files, or binary Avro or sequence files.It tries to fetch metadata from db table & calculates the max and min values of Primary key of tables to identify the data range (Amount of Data). This value helps Scoop to divide the load between mappers. Generally it uses 4 mappers & no reducers.

Scoop is built on Map-reduce logic & uses JDBC API’s to create Java/class files to process this metadata and at end create a JAR  file of it. So once the import is complete you will see 3 files created. For example: Employee.java, employee.class and employee.jar

Let’s learn how to use Scoop to import tables. Lets assume you have a mysql database (RDBMS) and you are trying to import a Employee table from it into HDFS.

Command Syntax:

sqoop import –connect jdbc:mysql://localhost/databasename –username $USER_NAME –password $PASSWORD$ –table tablename –m 1

Example:

$ sqoop import –connect jdbc:mysql://localhost/scoop_db –username scp –password scp123 –table employee –m 1

Here we specify the:

  • database path (localhost)
  • database name (scoop_db)
  • connection protocol (jdbc:mysql:)
  • username (scp)
  • password  (There are many ways to provide the password like on command line, store it in a file and call it etc.)
  • Always use ‘- -‘ for all sub commands like CONNECT, USERNAME, PASSWORD
  • Use ‘-‘ for Generic commands like FILE

To verify the imported data in HDFS, use the following command

(syntax from internet).
$ $HADOOP_HOME/bin/hadoop fs -cat /employee/part-m-*

It will show you fields and data with comma separated.

Now lets see various syntax and examples:

  • Import an entire table:

sqoop import –connect jdbc:mysql://localhost/abc –table EMPLOYEE

  • Import a subset of the columns from a table:

sqoop import –connect jdbc:mysql://localhost/abc–table EMPLOYEES –columns   “employee_id,first_name,age,designation”

  • Import only the few records by specifying them with a WHERE clause

sqoop import –connect jdbc:mysql://localhost/abc –table EMPLOYEES  –where “designion=’ADVISOR’ “

  • If table has primary key defined, we can set Parallelism to command by explicitly set the number of mappers using --num-mappers. Sqoop evenly splits the primary key range of the source table, as mentioned above.

sqoop import –connect jdbc:mysql://localhost/abc –table EMPLOYEES –num-mappers 6

  • If there is not primary key defined in the table, the data import must be sequential. Specify a single mapper by using –num-mappers 1 or  give ‘-m 1′ option for import.Otherwise it gives error

 sqoop import –connect jdbc:mysql://localhost/db –username $USER_NAME –password $PASSWORD$ –table tablename –m 1

 

  • To try a sample query without importing data, use the eval option to print the results to the command prompt:

sqoop eval –connect jdbc:mysql://localhost/abc –query “SELECT * FROM employees LIMIT 10”

 

Follow for more…Read next article on What is Spark ?

Want to learn how HDFS works  …read here

Want to learn Hadoop Installation…click here !!

Hadoop Ecosystem Components contd…(Tutorial Day 6)

So continuing the old post, here we will discuss some more components of Hadoop ecosystem.

Data Integration or ETL Components of Hadoop Ecosystem

Sqoop (SQL-to-Hadoop) is a big data tool that offers the capability to extract bulk data from non-Hadoop  or relational databases (like MySQL, Oracle,Teradata, Postgre) , transform the data into a form usable by Hadoop, and then load the data into HDFS, Hbase or Hive also. This process is similar to Extract, Transform, and Load.It parallelizes data transfer for fast performance, copies data quickly from external system to Hadoop & makes data analysis more efficient.

It’s batch oriented and not suitable for low latency interactive queries. It provides a scalable processing environment for both structured and non-structured data.

Sqoop Import

The import tool imports individual tables from RDBMS to HDFS. Each row in a table is treated as a record in HDFS. All records are stored as text data in text files or as binary data in Avro and Sequence files.

Sqoop Export

The export tool exports a set of files from HDFS back to an RDBMS. The files given as input to Sqoop contain records, which are called as rows in table. Those are read and parsed into a set of records and delimited with user-specified delimiter.

Sqoop Use Case-

Coupons.com , Apollo Group uses Sqoop component of the Hadoop ecosystem to enable transmission of data between Hadoop & data warehouse .
  • Flume-

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming or log files data into the Hadoop Distributed File System (HDFS).  It is used for collecting data from its origin and sending it back to the resting location (HDFS).Flume accomplishes this by outlining data flows that consist of 3 primary structures channels, sources and sinks. The processes that run the dataflow with flume are known as agents and the bits of data that flow via flume are known as events.

Flume helps to collect data from a variety of sources, like logs, jms, Directory etc.
Multiple flume agents can be configured to collect high volume of data.
It scales horizontally & is stream oriented.It provides high throughput and low latency.It is fault tolerant.

Both Sqoop and Flume, pull the data from the source and push it to the sink. The main difference is Flume is event driven, while Sqoop is not.

Flume Use Case –

Twitter source connects through the streaming API and continuously downloads the tweets (called as events). These tweets are converted into JSON format and sent to the downstream Flume sinks for further analysis of tweets and retweets to engage users on Twitter.
Goibibo uses Flume to transfer logs from production system to HDFS.

Data Storage Component of Hadoop Ecosystem

HBase

Hbase is an open source, distributed, sorted map model.Its a column store-based NoSQL database solution & is similar to Google’s BigTable framework.It supports random reads and also batch computations using MapReduce. With HBase NoSQL database enterprise can create large tables with millions of rows and columns on hardware machine. The best practice to use HBase is when there is a requirement for random ‘read or write’ access to big datasets. HBase’s important advantage is that it supports updates on larger tables and faster lookup. The HBase data store supports linear and modular scaling. HBase stores data as a multidimensional map and is distributed. HBase operations are all MapReduce tasks that run in a parallel manner.

Its well integrated with Pig/Hive/Sqoop. It is consistent and partition tolerant system in CAP theorem.

HBase Use Case-

Facebook is one the largest users of HBase with its messaging platform built on top of HBase in 2010.

Cassandra

Apache Cassandra is a free and open-source distributed database management system designed to handle large amounts of data across many commodity servers.This database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.Cassandra’s support for replicating across multiple data-centers is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.

Use Cases:

For Cassandra, Twitter is an excellent example. We know that, like most sites, user information (screen name, password, email address, etc), is kept for everyone and that those entries are linked to one another to map friends and followers. And, it wouldn’t be Twitter if it weren’t storing tweets, which in addition to the 140 characters of text are also associated with meta-data like timestamp and the unique id that we see in the URLs.

Monitoring, Management and Orchestration Components of Hadoop Ecosystem- Oozie and Zookeeper

  • Oozie-

Oozie is a workflow scheduler where the workflows are expressed as Directed Acyclic Graphs. Oozie runs in a Java servlet container Tomcat and makes use of a database to store all the running workflow instances, their states ad variables along with the workflow definitions to manage Hadoop jobs (MapReduce, Sqoop, Pig and Hive).The workflows in Oozie are executed based on data and time dependencies.

Oozie Use Case:

The American video game publisher Riot Games uses Hadoop and the open source tool Oozie to understand  the player experience.

  • Zookeeper-

Zookeeper is the king of coordination and provides simple, fast, reliable and ordered operational services for a Hadoop cluster. Zookeeper is responsible for synchronization service, distributed configuration service and for providing a naming registry for distributed systems.

Zookeeper Use Case-

Found by Elastic uses Zookeeper comprehensively for resource allocation, leader election, high priority notifications and discovery. The entire service of Found built up of various systems that read and write to   Zookeeper.

Here is the recorded session from the IBM Certified Hadoop Developer Course at DeZyre about the components of Hadoop Ecosystem –
Several other common Hadoop ecosystem components include: Avro, Cassandra, Chukwa, Mahout, HCatalog, Ambari and Hama. By implementing Hadoop using one or more of the Hadoop ecosystem components, users can personalize their big data experience to meet the changing business requirements. The demand for big data analytics will make the elephant stay in the big data room for quite some time.

Data Serialisation (Data Interchange Protocols)

AVRO: Apache Avro is a language-neutral data serialization system, developed by  Apache Hadoop.Data serialization is a mechanism to translate data in computer environment (like memory buffer, data structures or object state) into binary or textual form that can be transported over network or stored in some persistent storage media.Java and Hadoop provides serialization APIs, which are java based, but Avro is not only language independent but also it is schema-based.Once the data is transported over network or retrieved from the persistent storage, it needs to be deserialized again. Serialization is termed as marshalling and deserialization is termed as unmarshalling.

Avro uses JSON format to declare the data structures. Presently, it supports languages such as Java, C, C++, C#, Python, and Ruby.Avro has a schema-based system. A language-independent schema is associated with its read and write operations.

Like Avro, there are other serialization mechanisms in Hadoop such as Sequence Files, Protocol Buffers, and Thrift.Avro creates a self-describing file named Avro Data File, in which it stores data along with its schema in the metadata section.Avro is also used in Remote Procedure Calls (RPCs). During RPC, client and server exchange schemas in the connection handshake.

To serialize Hadoop data, there are two ways −

  • You can use the Writable classes, provided by Hadoop’s native library.
  • You can also use Sequence Files which store the data in binary format.

The main drawback of these two mechanisms is that Writables and SequenceFiles have only a Java API and they cannot be written or read in any other language.

Therefore any of the files created in Hadoop with above two mechanisms cannot be read by any other third language, which makes Hadoop as a limited box. To address this drawback, Doug Cutting created Avro, which is a language independent data structure.

Use Case:

Content credit : http://www.tutorialspoint.com

Avro provides rich data structures. For example, you can create a record that contains an array, an enumerated type, and a sub record. These datatypes can be created in any language, can be processed in Hadoop, and the results can be fed to a third language.

 

 Thrift :

Thrift is a lightweight, language-independent software stack with an associated code generation mechanism for RPC. Thrift provides clean abstractions for data transport, data serialization, and application level processing. Thrift was originally developed by Facebook and now it is open sourced as an Apache project. Apache Thrift is a set of code-generation tools that allows developers to build RPC clients and servers by just defining the data types and service interfaces in a simple definition file. Given this file as an input, code is generated to build RPC clients and servers that communicate seamlessly across programming languages.

Thrift supports a variety of languages including C++, Java, Python, PHP, Ruby.

To learn more on Hadoop…keep on reading these tutorials…every day we try to get something new and interesting for all my readers !!

Now we will start learning all Ecosystem components in more detail. Click here to read about how MapReduce Algorithm works with an easy example.

Hadoop Ecosystem Components contd…(Tutorial Day 5)

So continuing the old post, vendors that provide Hadoop-based platforms include Cloudera, Hortonworks, MapR, Greenplum, IBM, and Amazon. Here we will discuss more components of Hadoop ecosystem.

Data Access Components of Hadoop Ecosystem

  • Pig-

Apache Pig is a high-level platform for creating programs that run on Apache Hadoop. Apache Pig is a tool developed by Yahoo for analyzing huge data sets efficiently and easily. The high level data flow language for this platform is called Pig Latin. Pig can execute its Hadoop jobs in MapReduce, Apache Tez, or Apache Spark. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.

At the present time, Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the Hadoop subproject). Pig’s language layer currently consists of a textual language called Pig Latin.Pig is an open source project under the Apache Software Foundation, so you can learn about it online

Pig Latin is basically used it to construct dataflows, to have a scheduled job to periodically crunch the massive data from HDFS and transfer the summarized data into a relational database for reporting, & ad-hoc analyses. Hive is used for simple ad-hoc analytical queries for the data in HDFS, as Hive queries are a lot faster to write for those types of queries. Its generally used by Yahoo, Twitter etc to process web logs,images,maps etc.

Usage of Apache Pig:

  • Using Pig Latin, programmers can perform MapReduce tasks easily without having to type complex codes in Java, as it uses multi-query approach, thereby reducing the length of codes. For example, an operation that would require you to type 200 lines of code (LoC) in Java can be easily done by typing as less as just 10 LoC in Apache Pig. Ultimately Apache Pig reduces the development time by almost 16 times.
  • Pig Latin is SQL-like language and it is easy to learn Apache Pig when you are familiar with SQL.
  • Apache Pig provides many built-in operators to support data operations like joins, filters, ordering, etc. In addition, it also provides nested data types like tuples, bags, and maps that are missing from MapReduce.

Pig Use Case-

I am hereby using one of my fav use case of PIG Latin language, you can read here on Slideshare:

Scenario: You have a User data in one file ,website data in another. Now you want to find out the top 5 most visited pages by users of Age (18-25). For this scenario, MAp reduce program is full page length code, but in PIG Latin language its a small easily understandable code.

pig_latin-code_example
Code credit: Nick Dimiduk
  • Hive-

Hive is a Data warehouse system layer built on Hadoop. It allows to define a structure for unstructured big data and query the data using a SQL-like language called HiveQL. Its developed by Facebook & makes querying faster through indexing.

Hive Use Case-

Hive simplifies Hadoop at Facebook with the execution of 7500+ Hive jobs daily for Ad-hoc analysis, reporting and machine learning.

 

Read my next blog on more Hadoop ecosystem components (tutorial Day 6)

%d bloggers like this: