- Kafka : a Distributed Messaging System for Log Processing
- Streams and Tables : Two Sides of the Same Coin
Infrastructure as code, otherwise known as programmable infrastructure, is one of the key practices utilized in implementing data projects in the cloud.
In this article I explain what is meant by infrastructure as code. The motivations for implementing infrastructure as code are discussed as well as the mechanics.
Virtualization has helped to change all that. Suddenly the balance of power shifted from the system administrators who provide these resources to the developers who request these resources for their projects. With virtualized server farms, developers could provision and configure high-end computing nodes for a project in minutes rather than days or weeks.
This trend has only accelerated with the move to cloud computing and the public cloud in the past 5 years.
In public cloud computing scalable information resources are provided as a service to multiple external customers via the Internet.
In fact the cloud computing era has only been made possible because of virtualization – so much so that it is often dubbed as Virtualization 2.0.
The top 3 public cloud platforms are : Google Cloud Platform, Amazon AWS & Microsoft Azure.
Each of them provide a range of services including but not limited to:
and so on. A more detailed view of the services offered by the various cloud providers is available in the Appendix.
While such a request can be fulfilled via the Google Cloud console each time, it can be extremely error prone and inconsistent. This speaks to the need for some kind of automated approach and this is where Infrastructure as Code comes in.
Infrastructure-as-code (IAC) is an approach to the automation of infrastructure based on software development practices.
With the IAC approach, service provisioning and management along with configuration changes are expressed in an automated fashion through code. This ensures that changes are made in a consistent and enables easier validation of these changes.
For example, using a tool such as Ansible, developers can provision an entire set of virtual servers running Centos OS, install and configure Spark on them to form a cluster for distributed computing and run an entire ETL pipeline as a batch process and then terminate the cluster.
The idea is that modern tools can manage infrastructure like software and data.
Infrastructure can be managed via version control systems, automated testing
libraries and deployment orchestration tools. Software development
practices such as continuous integration and delivery (CICD) and test-driven-development (TDD) can be applied to the management of infrastructure.
Thus IAC is a way to represent your environment using software/config files so one can replicate it multiple times.
It consists of 2 main components:
IAC tools are often divided into 2 groups based on their functionality :
Most of the more popular configuration management tools such as Ansible do increasing offer provisioning capabilities, blurring the distinction between the 2 groups.
This has led to a robust debate about their capabilities, with some commentators emphasizing the distinction. My opinion is that for more complex infrastructural requirements such a distinction may have merit and necessitate usage of a different tool for each capability. I feel such a distinction will not last for long as the vendors of these configuration management tools will increasing add features to make their tools just as capable when it comes to provisioning and orchestration.
The intention is not to dive in depth into any one tool, but to give the reader an idea of what implementing infrastructure-as-code looks like.
|Service Type||Service Name|
|Virtual servers, scalable computing on demand||Amazon EC2
Google Compute Engine
Azure Virtual Machines.
|Durable Storage||Google Cloud Storage Amazon S3
|Relational Database||Google CloudSQL
Azure SQL Database
|Analytics & Big Data processing||Google DataProc & DataFlow
|Data Warehouse||Google BigQuery
Azure SQL Data Warehouse
|Networking - DNS||Google Cloud DNS
Amazon Route 53
Microsoft Azure DNS
|Networking - Virtual Private Cloud||Google Cloud VPC
Azure Virtual Network
|NoSQL Database||Google Cloud Datastore & Bigtable
Azure Cosmos DB
|Messaging||Google Cloud Pub/Sub
Azure Notification Hubs
|Deployment/Provisioning||Google Cloud Deployment Manager
Azure Resource Manager
|Tool||Main Features||Domain-Specific Language (DSL)|
|SaltStack||Cloud orchestration and automation|
DevOps toolchain workflow automation
|AWS Cloud Formation||Provisioning||JSON/YAML|
|Google Cloud Deployment Manager||Provisioning||JSON/YAML|
|Azure Resource Manager||Provisioning||JSON/YAML|
Quick tip on using gdrive to upload to Google Drive:
gdrive upload <path-to-local-file>
gdrive upload mydir/myfile.txt
This uploads the file to the home directory on Google Drive which is My Drive
To upload to a specific directory, do the following:
List the directories on Google Drive showing directory ids:
Obtain the directory id for the directory you wish to upload to.
gdrive upload --parent <id> mydir/myfile.txt
to upload the file to the directory in question
You can also search for specific folder in Google Drive by doing:
gdrive list -q "name contains 'BigData'"
This is the first of many posts in a series I am embarking chronicling my work on using Google Cloud Platform (GCP) for creating Big Data applications ? Why Google Cloud and not AWS you may ask ? Well, I already use AWS extensively at work, so at home I’m deciding to do something different so I can broaden my expertise. On a secondary note, it seems as if GCP may be a cheaper alternative than AWS for compute intensive workloads – and that matters when you pay for compute resources out of pocket.
So I already had a GCP account I had created a few years back. My first task was to figure out how to create VM instances on GC via the command line API gcloud and then via Ansible.
I followed the instructions for setting up the gcloud client on my Ubuntu laptop.
I was subsequently able to create an instance with the following command:
I was prompted to select a region and I chose us-east-1
gcloud compute instances create test-instance --image-family ubuntu-1710 --image-project ubuntu-os-cloud
My goal is fully automating the provisioning of resources in GCP, so the next step for me would be to figure out how to provision a VM instance using cloud automation software such as Puppet, Ansible.
Ansible is what I am most familiar with from work so Ansible it is.
The simplest and quickest way to get started is by reading the example in the Ansible Google Cloud Platform Guide.
The video is instructive, and subsequently I cloned the repo and attempted to follow the instructions to create my instance. I was able to create instances via the following :
and subsequently terminate them as follows:
git clone https://github.com/miguelgrinberg/microblog.git
git tag -l
microblog$ git checkout tags/v0.3 -b tag_v0.3
Switched to a new branch 'tag_v0.3'
Word count in single line
rdd.flatMap(line => line.split(" ")) .map(word => (word,1) .reduceByKey((count,accum)=>(count+accum))
Count number of lines in file
Display Spark version
Read csv file into dataframe in spark with header
Read csv file into dataframe in spark without header
First you need to specify a schema:
schema = StructType([ StructField("Field0", IntegerType()), StructField("Field1", StringType()), StructField("Field2", StringType()) ])
Then read in the data:
df=spark.read.format('com.databricks.spark.csv') .load(csv_path, schema = schema)
Handle commas in embedded quoted strings when reading in data
This is done by specifying an escape option:
df_raw=spark.read.format('com.databricks.spark.csv') .load(csv_path, schema = schema, escape='"')
Obtain distinct values of column in Spark DF
Convert list of DataFrame Rows to Python list
student_ids=[x.student_id for x in df_raw .select('student_id').distinct().take(200)]
Filter out rows based on values in list
Save dataframe to csv (only for small datasets)
# Many Files filtered_df.write.csv(output_csv) # Single File filtered_df.coalesce(1).write .option("header","true").csv(output_csv)
Count number of occurrences of composite key in data frame:
Start pyspark in python notebook mode
Display spark dataframe with all columns using pandas
import pandas as pd pd.options.display.max_columns = None pd.set_option('max_colwidth',100) df.limit(5).toPandas()
In : s = raw_input('--> ') # Python 2
s = input('--> ') #Python 3
Note that __file__ is the pathname of the file from which the module was loaded, if it was loaded from a file.
This is a brief synopsis of how I got the Cloudera Quickstart VM running via Docker on Google Compute Engine, which is Google’s cloud equivalent to Amazon’s AWS Cloud Computing Service.
The Cloudera Quickstart VM is a basic “Hadoop-In-A-Box” virtual machine solution which provides a Hadoop ecosystem for developers who wish to quickly test out the basic features of Hadoop without having to deploy an entire cluster. Since it doesn’t entail setting up a cluster, certain features provided by a cluster are missing.
Install gcloud on your local workstation as per these instructions:
Step 1. Create a container optimized VM on GCE:
$ gcloud compute --project "myproject-1439" \ instances create "quickstart-instance-1" \ --image container-vm --zone "us-central1-a" \ --machine-type "n1-standard-2" Created [https://www.googleapis.com/compute/v1/projects/gcebook-1039/zones/us-central1-a/instances/quickstart-instance-1].NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS quickstart-instance-1 us-central1-a n1-standard-2 10.240.0.2 22.214.171.124 RUNNING
I created an n1-standard-2 VM on GCE which has 2vCPUs and 7.GB RAM. It will already have docker pre-installed.
Step 3. Let’s check the image size:
$ docker images REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE cloudera/quickstart latest 2cda82941cb7 41 hours ago 6.336 GB
Given that our VM has total disk size of 10GB this is cutting it a bit close for the long term, if we wish to install other software.
So let’s create a persistent disk and make that available for storing our docker image:
I was able to create a 200GB persistent disk: bigdisk1
Step 4. Switch docker image installation directory to use the big persistent disk.
There are a couple of ways to do this as per this article:
The least trouble free way IMO was to mount the bigdisk1 persistent disk to it would be available for use by my VM, and move the default docker image installation directory to it.
First, create a mountpoint for the bigdisk:
$ sudo mkdir /mnt/bigdisk1
Next, mount it:
On GCE, the raw disk can be found at /dev/disk/by-id/google-<diskid>
$ sudo mount -o discard,defaults /dev/disk/by-id/google-bigdisk1 \ /mnt/bigdisk1
Finally, symlink it back to the default image installation directory:
$ sudo ln -s /mnt/bigdisk1/docker/ /var/lib/docker
Presto, we’re now ready. If we run docker pull on any image, the image will be written to the large persistent disk:
$ ls -ltr /var/lib/docker lrwxrwxrwx 1 root root 21 Apr 9 03:20 /var/lib/docker -> /mnt/bigdisk1/docker/
Step 5. Run the Cloudera Quickstart VM and get access to all the goodies:
$ sudo docker run --hostname=quickstart.cloudera --privileged=true \ -t -i cloudera/quickstart /usr/bin/docker-quickstart ... Starting zookeeper ... STARTED starting datanode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-datanode-quickstart.cloudera.out ... starting rest, logging to /var/log/hbase/hbase-hbase-rest-quickstart.cloudera.out Started HBase rest daemon (hbase-rest): [ OK ] starting thrift, logging to /var/log/hbase/hbase-hbase-thrift-quickstart.cloudera.out Started HBase thrift daemon (hbase-thrift): [ OK ] Starting Hive Metastore (hive-metastore): [ OK ] Started Hive Server2 (hive-server2): [ OK ] Starting Sqoop Server: [ OK ] Sqoop home directory: /usr/lib/sqoop2 Setting SQOOP_HTTP_PORT: 12000 ... Started Impala Catalog Server (catalogd) : [ OK ] Started Impala Server (impalad): [ OK ] [root@quickstart /]#
RDD’s or resilient distributed datasets are the fundamental data abstraction
in Apache Spark. An RDD amounts to what is a distributed dataset in memory.
It is distributed or partitioned among various nodes in a Spark cluster.
The official definition of an RDD in the official documentation is as follows:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
In the seminal paper on RDDs by Matei Zaharia, an RDD is defined as follows:
Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
The Resilient part is due to the fact that it is fault tolerant and hence if a
partition of the dataset is lost due to a node going done it can be recovered in a timely manner. The recoverability comes from the fact that its lineage graph is maintained by the driver program and hence the partition can be recomputed.
The lineage graph consists of a series of transformations and actions to be
computed on a partition of data and this is stored on the driver program.
The RDD in effect is a set of instructions which are lazily evaluated.
The evaluation only occurs when an action is encountered.
Here is an example of what an RDD’s lineage graph looks like:
scala> idBudgetRDD.toDebugString res6: String = (2) MapPartitionsRDD at map at <console>:35  | MapPartitionsRDD at filter at <console>:33  | MapPartitionsRDD at map at <console>:31  | MapPartitionsRDD at map at <console>:31  | MapPartitionsRDD at textFile at <console>:29 
The Distributed part stems from the fact that the data is distributed among worker nodes of a cluster. The driver program sends the RDD along with which partition of the data that should be computed on that particular cluster. The program on the worker node that is responsible for executing the set of instructions encapsulated in the RDD is called the Executor. The exact call is as follows, in
@DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T]
The Dataset expresses the fact that we are in fact processing a collection of
data albeit one that will be partitioned.
Again, taking a look at the source code, in
we see :
/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getPartitions: Array[Partition]
RDDs support 2 types of operations: Transformations and Actions.
Transformations convert an RDD from one type to another.
They are lazily evaluated, meaning that they’re only executed when data is ready to be output.
Actions trigger the execution of the chain of operations that result in data being returned. They are necessary for transformations to be evaluated. Without an action, an RDD is just a chain of transformations that are yet to be evaluated.
An RDD is characterized by the of the following 5 properties:
The RDD lives in the Spark Context on the driver program which runs on the master node in a cluster setup.
It is uniquely identified by an id :
scala> val exampleRDD = sc.parallelize(List(1,2,3,4,5)) exampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD at parallelize at <console>:27 scala> exampleRDD.id res34: Int = 12 scala> val example2RDD = sc.parallelize(List(1,2,3,4,5)) example2RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD at parallelize at <console>:27 scala> example2RDD.id res36: Int = 13 scala> val example3RDD=exampleRDD example3RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD at parallelize at <console>:27 scala> example3RDD.id res37: Int = 12
primesRDD = sc.parallelize([2,3,5,7,11,13,17])
gettysburgRDD = sc.textFile("hdfs://user/dataphantik/gettysburg.txt"
errorsRDD = baseRDD.filter(lambda line : line contains 'Error')
The base class of all RDDs, org.apache.spark.rdd.RDD is an abstract class which all base RDDs inherit from.
The base RDD classes are:
There are also depictions of