Data Engineering Cookbook     About     Cookbook     Feed

This site is hosted by Helmut Zechmann. I use it to publish data engineering related HOWTOs and code snippets.

Loops For Oozie Workflows

The oozie workflow scheduler provides a language for defining workflows as directed acyclic graphs. There is no built in support for loops since loops create circles in the workflow. But in some scenarios it may be helpful to iterate over a list of items with unknown length. While oozie does not offer direct support for loops they can be simulated by recursive calls using a sub-workflow action.

The basic idea is that a workflow calls itself again using a sub-workflow action. I’ll illustrate that in a small example. In the example we process a list of files with configurable length. The files need to be specified as input_file_1, input_file_2, ….

Credits

This is not my own idea. The technique presented here is described in a post by Robert Kantner on the cdh-users mailing list.

Caveat Emptor

Before I start I would like to issue a warning: As it is always the case with recursion you have to take care that your stopping condition for the recursion is implemented correctly. Else you will end up with blocking the whole cluster with an infinite number of recursively created workflow jobs.

As of oozie version 4.1.0 the maximum sub-worfklow depth is limited to 50 as decribed in OOZIE-1550.

Recursive Call

Here we show the action that is responsible for the recursive call. The sub-workflow points to it’s own workflow file. To implement a termination condition we increase the variable counter by one and pass it as parameter to the recursive call.

    <action name="loop">
        <sub-workflow>
            <app-path>${wf:appPath()}/../loop.xml</app-path>
            <configuration>
                <property>
                    <name>counter</name>
                    <value>${counter + 1}</value>
                </property>
            </configuration>
        </sub-workflow>
        <ok to="end"/>
        <error to="kill" />
    </action>

Termination

A decision node checks the stopping condition for the recursion. In this example we check if there is an input file for the current counter value.

    <decision name="check-if-done">
        <switch>
            <case to="process-data">${not empty wf:conf(concat("input_file_", counter))}</case>
            <default to="end" />
        </switch>
    </decision>

Configuration

The list of files can be specified in the workflow configuration file. The following configuration passes a list of three files to our example workflow:

input_file_1=myinput1.txt
input_file_2=myinput2.txt
input_file_3=myinput3.txt

Complete Example

Following code shows the complete workflow. It may be called by another workflow that passes in the initial value for the variable counter.

<workflow-app xmlns="uri:oozie:workflow:0.5'" name="loop-example">

    <parameters>
        <property>
            <name>counter</name>
        </property>
    </parameters>

    <start to="check-if-done"/>

    <decision name="check-if-done">
        <switch>
            <case to="process-data">${not empty wf:conf(concat("input_file_", counter))}</case>
            <default to="end" />
        </switch>
    </decision>

    <action name="process-data">
        <!-- actual locic happens here -->
        <ok to="loop"/>
        <error to="kill" />
    </action>

    <action name="loop">
        <sub-workflow>
            <app-path>${wf:appPath()}/../loop.xml</app-path>
            <configuration>
                <property>
                    <name>counter</name>
                    <value>${counter + 1}</value>
                </property>
            </configuration>
        </sub-workflow>
        <ok to="end"/>
        <error to="kill" />
    </action>


    <kill name="kill">
        <message>Action failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>


    <end name="end" />


</workflow-app>

Tools For Working With Archives

Here I collect some useful tools for working with zip/jar/gz files. As always: just snippets and short descriptions.

List the contents of an archive

To list the contents of a jar/war/zip archive we type the following:

jar tf frontend.war

Extract a file

To extract a file from the archive, type something like

jar xf frontend.war config/app-conf.xml

This creates a folder named config with the file app-conf.xml inside. Now you can edit the config file. After you are done editing the file you can update the archive by typing

Update a file

jar uf frontend.war config/app-conf.xml

View the contents of a file within the archive

For having a quick look at the contents of a file you can use the unzip command:

unzip -q -c frontend.war config/app-conf.xml

(the c switch redirects output to stdout, the q(uiet) switch suppresses all other output.

Test the integrity of archives

gunzip -t *.gz

will report broken gzip archives without unpacking them.

Access Your Hadoop Cluster Over SSH

Easy SSH access

First you need to setup passwordless ssh access using a public key. Refer to this guide to setup ssh access using ssh keys.

To enable easy ssh access to a machine (this is not limited to hadoop), add a section like this to your ~/.ssh/config

# demoproject staging cluster
Host stagingcluster
HostName 79.148.37.213
User root
IdentityFile ~/.ssh/mykey_rsa

Now you can ssh into the remote machine using the command

ssh stagingcluster

Access the cluster from your developer machine

Open a socks proxy

Add the following line to your ~/.bash_profile

alias opensocks='ssh -f -N -D 7070 stagingcluster'

Then open the proxy by simply typing

opensocks

Access the Resource Manager using the proxy

Use firefox with FoxyProxy.

HDFS access

To access hdfs from your developer machine you need a local copy of your hadoop distribution. Configure it to point to your cluster (e.g. download hadoop client config from cloudera manager).

Then add the following configuration to your hdfs-site.xml (Thanks to Stephan Friese for showing me this):

  <property>
    <name>hadoop.socks.server</name>
    <value>localhost:7070</value>
  </property>
  <property>
    <name>hadoop.rpc.socket.factory.class.default</name>
    <value>org.apache.hadoop.net.SocksSocketFactory</value>
  </property>
  <property>
    <name>dfs.client.use.legacy.blockreader</name>
    <value>true</value>
  </property>

Now you can do things such as

hdfs dfs -ls
hdfs dfs -put somefile somewhere

Maybe you also have to add the namenode to your /etc/hosts.