Donnerstag, 15. September 2016

Cloudera Manager and Slack

The most of us are getting bored by receiving hundreds of monitoring emails every day. To master the flood, rules are getting in play - and with that rules the interest into email communication are reduced.
To master the internal information flood, business messaging networks like Slack are taking more and more place.

To make CM work with Slack a custom alert script from my Github will do the trick:

https://github.com/alo-alt/Slack/blob/master/cm2slack.py

The use is pretty straight forward - create a channel in Slack, enable Webhooks, place the token into the script, store the script on your Cloudera Manager host, make it executable for cloudera-scm : and enable outgoing firewall / proxy rules to let the script chat with Slack's API. The script can handle proxy connections, too.

In Cloudera Manager, the script path needs to be added into Cloudera-Management-Service => Configuration => Alert Publisher => Custom Script.



Dienstag, 16. August 2016

Manage rights in OpenStack

Openstack lacks on sophisticated rights management, the most users figure. But that's not the case, role management in Openstack is available.
First users and groups needs to be added to projects, this can be done per CLI or GUI [1]. Lets say, a group called devops shall have the full control about OpenStack, but others not in that group can have dedicated operation access like create snapshot, stop / start / restart an instance or looking at the floating IP pool.

Users, Groups and Policies
OpenStack handles the rights in a policy file in /etc/nova/policy.json, using roles definitions per group assigned to all tasks OpenStack provides. It looks like:

{
"context_is_admin": "role:admin",
"admin_or_owner": "is_admin:True or project_id:%(project_id)s",
"default": "rule:admin_or_owner",
...
}

It describes the default - an member of a project is the admin of that project. To add additional rules, they have to be defined here.
In my case, I created a goup devops, added the users and defined the rights like:

"devops": "is_admin:True or (project_id:%(project_id)s and not role:user and not role:guest)",

and assigned the role to all tasks, an DevOps team member should be able to perform. Project owners / admins can operate with instances in her project, but in an controlled way. Basically, I revoked all delete / move / reassign tasks, like network and subnet management or delete an instance.
The full policy file is available in my GitHub repo [2].

[1] http://docs.openstack.org/admin-guide/cli-manage-projects-users-and-roles.html
[2] https://github.com/alo-alt/OpenStack/blob/master/policy.json

Montag, 4. Juli 2016

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like:

ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy

Kafka-Tools

Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):

bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test

Prior to Kafka 0.9 the only possibility to get this informations was to use zkCli.sh (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3]. 
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:

ls /consumer/test/offsets
[1]
get /consumer/test/offsets/1
[15]

With Kafka that command would look like:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181

Group     Topic   Pid   Offset   logSize   Lag   Owner
console-1 test    1     15       337       326   none


After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown. 

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest 16 test

After restart, Kafka should be able read the next message, in the case this message isn’t corrupted, too. And yes, the corrupted message is lost and can’t be restored, so it's always a good idea to implement a CRC check before any message gets to Kafka.

A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.

Montag, 27. Juni 2016

Encryption in HDFS

Encryption of data was and is the hottest topic in terms of data protection and prevention against theft. Hadoop HDFS supports full transparent encryption in transit and at rest [1], based on Kerberos implementations [2], often used within multiple trusted Kerberos domains.

Technology

Hadoop KMS provides a REST-API, which has built-in SPNEGO and HTTPS support, comes mostly bundled with a pre-configured Apache Tomcat within your preferred Hadoop distribution. 
To have encryption transparent for the user and the system, each encrypted zone is associated with a SEZK (single encryption zone key), created when the zone is defined as an encryption zone by interaction between NN and KMS. Each file within that zone will have its own DEK (Data Encryption Key). This behavior is fully transparent, since the NN directly asks the KMS for a new EDEK (encrypted data encryption key) encrypted with the zones key and adds them to the file’s metadata when a new file is created.

When a client wants to read a file in an encrypted zone, the NN provides the EDEK together with a zone key version and the client asks the KMS to decrypt the EDEK. If the client has permissions to read that zone (POSIX), the client will use the provided DEK to read the file. Seen from a DFS node perspective, that datastream is encrypted and the nodes only see an encrypted data stream. 

Setup and Use

I use here Cloudera’s CDH as example, but the same would work with other distributions and for sure with the official Apache Hadoop distribution. Enabling KMS in CDH (5.3.x and up) it's pretty easy, and doesn’t need to be explained here since Cloudera has great articles online about that process [3]. Important to know is only that KMS doesn’t work without a working Kerberos implementation. Additionally, there are other configuration parameters which need to be known, especially in a multi-domain Kerberos environment.
First, KMS uses the same rule based mechanism as HDFS uses when a trusted kerberos environment is used. That means the same filtering rules as existent in core-site.xml need to be added to kms-site.xml to get the encryption for all trusted domains working. This has to be done per:

<property>
 <name>hadoop.kms.authentication.kerberos.name.rules</name>
  <value>RULE:[1:$1@$0](.*@\QTRUSTED.DOMAIN\E$)s/@\QTRUSTED.DOMAIN\E$//
RULE:[2:$1@$0](.*@\QTRUSTED.DOMAIN\E$)s/@\QTRUSTED.DOMAIN\E$//
RULE:[1:$1@$0](.*@\QMAIN.DOMAIN\E$)s/@\QMAIN.DOMAIN\E$//
RULE:[2:$1@$0](.*@\QMAIN.DOMAIN\E$)s/@\QMAIN.DOMAIN\E$//
DEFAULT</value>
</property>


per kms-site.xml. The terms trusted.domain / main.domain are placeholders, describing the original and the trusted kerberos domain. The use from an administrative standpoint is straightforward:
hadoop key create KEYNAME #(one time key creation)
hadoop fs -mkdir /enc_zones/data
hdfs crypto -createZone -keyName KEYNAME -path /enc_zones/data
hdfs crypto -listZones


First I create a key, then I create the directory I want to encrypt in HDFS and encrypt this with the key I created first. 
This directory is now only accessible by me or users I give access per HDFS POSIX permissions. Others aren’t able to change or read files. To give superusers the possibility to create backups without de- and encrypt, a virtual path prefix for distCp (/.reserved/raw) [4] is available. This prefix allows the block-wise copy of encrypted files, for backup and DR reasons.

The use of distCp for encrypted zones can cause some mishaps. Highly recommended is to have identical encrypted zones on both sides to avoid problems later. A potential distCp command for encrypted zones could look like:

hadoop distcp -px hdfs://source-cluster-namenode:8020/.reserved/raw/enc_zones/data hdfs://target-cluster-namenode:8020/.reserved/raw/enc_zones/data

Samstag, 4. Juni 2016

Open Source based Hyper-Converged Infrastructures and Hadoop

According to a report from Simplivity [1] Hyper-Converged Infrastructures are used by more than 50% of the interviewed businesses, tendentious increasing. But what does this mean for BigData solutions, and Hadoop especially? What tools and technologies can be used, what are the limitations and the gains from such a solution?

To build a production ready and reliable private cloud to support Hadoop clusters as well as on-demand and static I have made great experience with OpenStack, Saltstack and the Sahara plugin for Openstack.
Openstack supports Hadoop-on-demand per Sahara, it's also convenient to use VM's and install a Hadoop Distribution within, especially for static clusters with special setups. The Openstack project provides ready to go images per [2], as example for Vanilla 2.7.1 based Hadoop installations. As an additional benefit, Openstack supports Docker [3], which adds an additional layer of flexibility for additional services, like Kafka [4] or SolR [5].

Costs and Investment
The costs of such an Infrastructure can vary, depending on the hardware and future strategy. Separate compute and storage nodes have been proven in the past, and should be used in future, too. The benefits outweigh the limitations, mostly end up in having move bare metal servers than in a high packed (compute and storage in one server) environment. Additionally, a more stretched environment
helps to balance peaks and high usage better than packed servers. A typical setup would have 2 controller nodes (for HA reasons), a decent count on compute nodes (high memory and CPU count) and several storage nodes (1 CPU, 8 or 16GB RAM and plenty JBOD (just a bunch of disks)). Those storage nodes should have 2 LVM’s (or raids, if that feels better) to avoid later conflicts with production and development / staging / QA buildouts.

Technology
Hadoop itself has some limitations, especially in Hyper-Converged Infrastructures, given by the demand on data locality for batch processes (MapReduce). In a typical cloud environment, like Sahara is providing in Openstack, the storage area is virtualized, and all data is transferred over the network stack. This can be avoided by using VM images for a persistent Hadoop cluster, as a production one mostly is. The data storage (HDFS) will then be provided within the VM and can be extended by mounting additional volumes to the VM (partitions for the data nodes, for example). In both implementations, Cloud based by Sahara and VM, the use of HDFS caching [6] is recommended. This will dramatically speed up the platform for analytical workloads by using columnar based storage formats like Parquet or Kudu [7], together with Hive on Spark [8]. To identify bottlenecks analyzer like Dr. Elephant [9] are very useful and recommended.

Hadoop on demand provides much more flexibility as a static cluster has, especially in terms of load peaks, dynamical resource allocation and cost efficiency. But there are some points to consider. The first and most important one is the separation of block storage and computing. Hadoop itself works with different other distributed filesystems, like ceph [10], but those often rely on Hadoop 1 (MRv1) and Yarn and MRv2 aren’t supported (yet).
The best solution here is to use the standard HDFS layer over cinder [11], which provides good performance with reliability and decent IOpS. The second, and also important one is the network layer. Every compute and storage node should have at least bonded 1GB uplinks, 10G are better (but more expensive). The network needs to be separated into front- and backend. The front-end link provides accessibility to the services the cluster provides to its users, and the back-end provides inter-cluster-communication only. As a third point the use of in-memory filesystems like Alluxio [12] (former Tachyon) may be considered, especially for research clusters, like Genome calculation or NRT applications with high ingestion rates of small data points, like IoT devices typically do.
With these points in mind, streaming based applications getting the most out of this approach, given by the high flexibility and the availability to deal with large load peaks by adding computing resources dynamically. 

Conclusion
Using Hyper-Converged Infrastructures in the world of BigData tools is trending now and proves the success of the private cloud idea. Large companies like LinkedIN, Google, Facebook are on this road since years, and the success outweighs the implementation and maintenance considerations.

List of tools used in this article
Openstack:
Sahara:

Saltstack - Openstack:

Links and References: