Wednesday, October 12, 2016

Shifting paradigms in the world of BigData

In building the next generation of applications, companies and stakeholders need to adopt new paradigms. The need for this shift is predicated on the fundamental belief that building a new application at scale requires tailored solutions to that application’s unique challenges, business model and ROI. Some things change, and I’d like to point to some of that changes.

Event Driven vs. CRUD
Software development traditionally is driven by entity-relation modeling and CRUD operations on that data. The modern world isn’t about data at rest, it’s about being responsive to events in flight. This doesn’t mean that you don’t have data at rest, but that this data shouldn’t be organized in silos.
The traditional CRUD model is neither expressive nor responsive, given by the amount of uncountable available data sources. Since all data is structured somehow, an RDBMS isn't able to store and work with data when the schema isn't known (schema on write). That makes the use of additional free available data more like an adventure than a valid business model, given that the schema isn't known and can change rapidly. Event driven approaches are much more dynamical, open and make the data valuable for other processes and applications. The view to the data is defined by the use of the data (schema on read). This views can be created manually (Data Scientist), automatically (Hive and Avro for example) or explorative (R, AI, NNW).

Centralized vs Siloed Data Stores
BigData projects often fail by not using a centralized data store, often refereed as Data Lake or Data Hub. It’s essential to understand the idea of a Data Lake and the need for it. Siloed solutions (aka data warehouse solutions) have only data which match the schema and nothing else. Every schema is different, and often it’s impossible to use them in new analytic applications. In a Data Lake the data is stored as it is - originally, untouched, uncleaned, disaggregated. That makes the entry (or low hanging fruit) mostly easy - just start to catch all data you can get. Offload RDBMS and DWs to your Hadoop cluster and start the journey by playing with that data, even by using 3rd party tools instead to develop own tailored apps. Even when this data comes from different DWH's, mining and correlating them often brings treasures to light.

Scaled vs. Monolith Development
Custom processing at scale involves tailored algorithms, be they custom Hadoop jobs, in-memory approaches for matching and augmentation or 3rd party applications. Hadoop is nothing more (or less) than a framework which allows the user to work within a distributed system, splitting workloads into smaller tasks and let those tasks run on different nodes. The interface to that system are reusable API's and Libraries. That makes the use of Hadoop so convenient - the user doesn't need to take care about the distribution of tasks nor to know exactly how the framework works. Additionally, every piece of written code can be reused by others without having large code depts.
On the other hand Hadoop gives the user an interface to configure the framework to match the application needs dynamically on runtime, instead of having static configurations like traditional processing systems.

Having this principles in mind by planning and architecting new applications, based on Hadoop or similar technologies doesn’t guarantee success, but it lowers the risk to get lost. Worth to note that every success has had many failures before. Not trying to create something new is the biggest mistake we can made, and will result sooner or later in a total loss.

Thursday, September 15, 2016

Cloudera Manager and Slack

The most of us are getting bored by receiving hundreds of monitoring emails every day. To master the flood, rules are getting in play - and with that rules the interest into email communication are reduced.
To master the internal information flood, business messaging networks like Slack are taking more and more place.

To make CM work with Slack a custom alert script from my Github will do the trick:

The use is pretty straight forward - create a channel in Slack, enable Webhooks, place the token into the script, store the script on your Cloudera Manager host, make it executable for cloudera-scm : and enable outgoing firewall / proxy rules to let the script chat with Slack's API. The script can handle proxy connections, too.

In Cloudera Manager, the script path needs to be added into Cloudera-Management-Service => Configuration => Alert Publisher => Custom Script.

Tuesday, August 16, 2016

Manage rights in OpenStack

Openstack lacks on sophisticated rights management, the most users figure. But that's not the case, role management in Openstack is available.
First users and groups needs to be added to projects, this can be done per CLI or GUI [1]. Lets say, a group called devops shall have the full control about OpenStack, but others not in that group can have dedicated operation access like create snapshot, stop / start / restart an instance or looking at the floating IP pool.

Users, Groups and Policies
OpenStack handles the rights in a policy file in /etc/nova/policy.json, using roles definitions per group assigned to all tasks OpenStack provides. It looks like:

"context_is_admin": "role:admin",
"admin_or_owner": "is_admin:True or project_id:%(project_id)s",
"default": "rule:admin_or_owner",

It describes the default - an member of a project is the admin of that project. To add additional rules, they have to be defined here.
In my case, I created a goup devops, added the users and defined the rights like:

"devops": "is_admin:True or (project_id:%(project_id)s and not role:user and not role:guest)",

and assigned the role to all tasks, an DevOps team member should be able to perform. Project owners / admins can operate with instances in her project, but in an controlled way. Basically, I revoked all delete / move / reassign tasks, like network and subnet management or delete an instance.
The full policy file is available in my GitHub repo [2].


Monday, July 4, 2016

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like:

ERROR Error processing message, stopping consumer: ($) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy


Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):

bin/ --zookeeper management01:2181 --describe --group test

Prior to Kafka 0.9 the only possibility to get this informations was to use (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3]. 
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:

ls /consumer/test/offsets
get /consumer/test/offsets/1

With Kafka that command would look like:

bin/ --group console-1 --zookeeper zknode1:2181

Group     Topic   Pid   Offset   logSize   Lag   Owner
console-1 test    1     15       337       326   none

After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown. 

bin/ latest 16 test

After restart, Kafka should be able read the next message, in the case this message isn’t corrupted, too. And yes, the corrupted message is lost and can’t be restored, so it's always a good idea to implement a CRC check before any message gets to Kafka.

A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.

Monday, June 27, 2016

Encryption in HDFS

Encryption of data was and is the hottest topic in terms of data protection and prevention against theft. Hadoop HDFS supports full transparent encryption in transit and at rest [1], based on Kerberos implementations [2], often used within multiple trusted Kerberos domains.


Hadoop KMS provides a REST-API, which has built-in SPNEGO and HTTPS support, comes mostly bundled with a pre-configured Apache Tomcat within your preferred Hadoop distribution. 
To have encryption transparent for the user and the system, each encrypted zone is associated with a SEZK (single encryption zone key), created when the zone is defined as an encryption zone by interaction between NN and KMS. Each file within that zone will have its own DEK (Data Encryption Key). This behavior is fully transparent, since the NN directly asks the KMS for a new EDEK (encrypted data encryption key) encrypted with the zones key and adds them to the file’s metadata when a new file is created.

When a client wants to read a file in an encrypted zone, the NN provides the EDEK together with a zone key version and the client asks the KMS to decrypt the EDEK. If the client has permissions to read that zone (POSIX), the client will use the provided DEK to read the file. Seen from a DFS node perspective, that datastream is encrypted and the nodes only see an encrypted data stream. 

Setup and Use

I use here Cloudera’s CDH as example, but the same would work with other distributions and for sure with the official Apache Hadoop distribution. Enabling KMS in CDH (5.3.x and up) it's pretty easy, and doesn’t need to be explained here since Cloudera has great articles online about that process [3]. Important to know is only that KMS doesn’t work without a working Kerberos implementation. Additionally, there are other configuration parameters which need to be known, especially in a multi-domain Kerberos environment.
First, KMS uses the same rule based mechanism as HDFS uses when a trusted kerberos environment is used. That means the same filtering rules as existent in core-site.xml need to be added to kms-site.xml to get the encryption for all trusted domains working. This has to be done per:


per kms-site.xml. The terms trusted.domain / main.domain are placeholders, describing the original and the trusted kerberos domain. The use from an administrative standpoint is straightforward:
hadoop key create KEYNAME #(one time key creation)
hadoop fs -mkdir /enc_zones/data
hdfs crypto -createZone -keyName KEYNAME -path /enc_zones/data
hdfs crypto -listZones

First I create a key, then I create the directory I want to encrypt in HDFS and encrypt this with the key I created first. 
This directory is now only accessible by me or users I give access per HDFS POSIX permissions. Others aren’t able to change or read files. To give superusers the possibility to create backups without de- and encrypt, a virtual path prefix for distCp (/.reserved/raw) [4] is available. This prefix allows the block-wise copy of encrypted files, for backup and DR reasons.

The use of distCp for encrypted zones can cause some mishaps. Highly recommended is to have identical encrypted zones on both sides to avoid problems later. A potential distCp command for encrypted zones could look like:

hadoop distcp -px hdfs://source-cluster-namenode:8020/.reserved/raw/enc_zones/data hdfs://target-cluster-namenode:8020/.reserved/raw/enc_zones/data