Universität Paderborn - Home Universität Paderborn
Die Universität der Informationsgesellschaft
Configuration

PUB-Web consists of three different types of Peer-to-Peer nodes: workers that run some threads of parallel programs utilizing the available idle time; supernodes that are required to manage the network, i.e., balance the workload and authenticate peers (in the current implementation exactly one supernode is required per network because the load balancer is not yet implemented in a distributed fashion); and consumer nodes via which users can run their parallel programs. A physical node can play one, two, or all three roles in the PUB-Web network.

Before a worker, consumer, or supernode role of PUB-Web can be started, a few adaptions to the configuration files (included in the binary download bundle) are necessary:

to the top

The PUB-Web API

At first, one more adaption in pubweb-consumer.conf is necessary: please add the path, where the .class files of your program will be located, to the padrmi.path property; also .jar files are supported.

We assume that you are already familiar with the Java programming language as well as with the BSP model and basic BSP programming princples. If the latter one is not the case, we recommend the book Parallel Scientific Computation: A Structured Approach using BSP and MPI by Rob H. Bisseling, Oxford University Press, 2004, which is an excellent guide to parallel scientific compuation using BSP.

You may also wish to get inspired by the source code of the examples and the compile scripts in the SVN repository.

As user programs intended to run on PUB-Web have to be BSP programs, the interface BSPProgram must be implemented, i.e., the program must have a method with this signature:

public void bspMain(BSP bspLib, Serializable args) throws AbortedException

Its first parameter is a reference to the PUB-Web runtime environment in order access BSP API methods; the second parameter holds the arguments passed to the BSP program, which is a String[] if the BSP program is started from a command prompt, or any serializable Java object in case the interoperability API (see below) is used.

In order to enable BSP programs to be migrated at runtime by the load balancer, they need to be compiled using the PadMig compiler and need to implement the BSPMigratableProgram interface instead, which means that the main method has this different signature:

@Migratory public void bspMain(BSPMigratable bspLib, Serializable args) throws AbortedException

In the following, we discuss the BSP library functions which can be accessed via the BSP and BSPMigratable interface, respectively, which is implemented by the PUB-Web runtime environment. In non-migratory programs, the barrier synchronization is entered by calling:

public void sync()

The migratory version by default additionally creates a backup copy of the execution state and performs migrations if suggested by the load balancer; you can suppress the migration check and/or creation of a backup copy using the second version of this method (note that, when suppressing backup copies, the BSP program crashes if one process crashes in the current superstep, no matter if there are older backup copies from earlier supersteps):

@Migratory public void syncMig()
@Migratory public void syncMig(boolean mayMigrate, boolean createSnapshot)

A message, which can be any serializable Java object, can be sent with these methods; thereby the latter two methods are for broadcasting a message to an interval and an arbitrary subset of the BSP processes, respectively:

public void send(int to, Serializable msg) throws IntegrityException
public void send(int pidLow, int pidHigh, Serializable msg) throws IntegrityException
public void send(int[] pids, Serializable msg) throws IntegrityException

Messages sent in the previous superstep can be accessed with these methods, where the find* methods are for accessing messages of a specific sender:

public int getNumberOfMessages()
public Message getMessage(int index) throws IntegrityException
public Message[] getAllMessages()
public Message findMessage(int src, int index) throws IntegrityException
public Message[] findAllMessages(int src) throws IntegrityException

When receiving a message, it is encapsulated in a Message object. The message itself as well as the sender ID can get obtained with these methods:

public Serializable getContent()
public int getSource()

In order to terminate all the processes of a BSP program, e.g. in case of an error, the following method has to be called; the Throwable parameter will be transmitted to the PUB-Web user who has started the program:

public void abort(Throwable cause) throws AbortedException

Any output to stdout or stderr should be printed using the following methods as they display it on the command prompt of the user who has started the BSP program rather than on the computer where the processes are actually running:

public void printStdOut(String line)
public void printStdErr(String line)

Beside writing to stdout and stderr, BSP programs can output any serializable Java object, which is especially useful when a BSP program is started using the interoperability API (see below):

public void writeRawData(Serializable data)

By default the output is sent back to the user asynchronously in the background so that these methods immediately return, even if delivering the output takes some time due to network delays. However, in some cases it might be necessary to ensure that the output is delivered before proceeding, e.g., before a program is terminated using the abort() method. This can be achieved using this method:

public void flush()

To access data from files, the following method should be used. In particular, any file in the BSP program folder of the PUB-Web user's peer can be read with it:

public InputStream getResourceAsStream(String name) throws IOException, MalformedURLException

In migratable programs there is also a method available which may be called to mark additional points inside long supersteps where a migration is safe (i.e. no open files etc.):

@Migratory public boolean mayMigrate()

Furthermore, there are some service functions to obtain the number of processes of the BSP program, the own process ID, and so on.

The following example program demonstrates the basic BSP features, especially how to send and receive messages:

import pubweb.*;
import pubweb.bsp.*;

public class MessagePassing implements BSPProgram {

    public void bspMain(BSP bsp, Serializable args) throws AbortedException {
        // calculate neighbours
        int pid = bsp.getProcessId();
        int n = bsp.getNumberOfProcessors();
        int left = (pid - 1 + n) % n;
        int right = (pid + 1) % n;

        try {
            bsp.send(left, new Integer(1));
            bsp.send(right, new Integer(2));
        } catch (IntegrityException ie) {
            bsp.printStdErr("sending failed: " + ie.getMessage());
        }

        bsp.sync();

        // get all my messages, method 1
        n = bsp.getNumberOfMessages();
        for (int i = 0; i < n; i++) {
            try {
                Message msg = bsp.getMessage(i);
                bsp.printStdOut("got " + msg.getContent() + " from pid " + msg.getSource() + " in superstep " + msg.getSuperstep());
            } catch (IntegrityException ie) {
                bsp.printStdErr("receiving failed: " + ie.getMessage());
            }
        }

        // get all my messages, method 2
        Message[] msgs = bsp.getAllMessages();
        bsp.printStdOut("got in total " + msgs.length + " messages");

        // get messages from some specified pid, method 1
        try {
            int i = 0;
            Message msg;
            while ((msg = bsp.findMessage(0, i++)) != null) {
                bsp.printStdOut("received " + msg.getContent() + " from pid 0 in superstep " + msg.getSuperstep());
            }
        } catch (IntegrityException ie) {
            bsp.printStdErr("receiving failed: " + ie.getMessage());
        }

        // get messages from some specified pid, method 2
        try {
            msgs = bsp.findAllMessages(0);
            bsp.printStdOut("got " + msgs.length + " messages from pid 0");
        } catch (IntegrityException ie) {
            bsp.printStdErr("receiving failed: " + ie.getMessage());
        }
    }
}

to the top

The Interoperability Interface

A BSP program needs not necessarily be launched by a user. Using the interoperability interface, BSP programs can be started out of other stand-alone software just like an (asynchronous) function call: the PUB-Web peer software can be embedded in GUI-less mode into other applications using the interoperability API. Amongst others, the API provides a function to start a BSP program and an interface for callback-functions to receive the output and, possibly, error messages of the parallel program. Parameters passed to the BSP program as well as the output sent back are not restricted to strings, but can be any (serializable) object.

In particular, one first needs to create an instance of a PUB-Web peer in consumer mode:

Consumer consumer = new Consumer("/path-to/config-file.conf")

BSP programs are started using the following method, where desc is the description of the BSP program appearing in the process list, nProcs is the number of requested BSP processes, mainClass is the fully qualified name of the BSP program's main class, and progArgs can any serializable Java object to pass as argument to the BSP program:

public Job newJob(String desc, int nProcs, String mainClass, Serializable progArgs)
    throws ClassNotFoundException, IntegrityException, InternalException, MalformedURLException,
        NotConnectedException, NotEnoughWorkersException, PpException

In order to asynchronously receive the output and status updates of the BSP program, an event listener is required, which can be added / removed using these methods:

public void addConsumerEventListener(ConsumerEventListener cel)
public void removeConsumerEventListener(ConsumerEventListener cel)

he event handler interface contains methods to receive output and status messages of BSP programs:

public void printToStdOut(Job job, int pid, String line)
public void printToStdErr(Job job, int pid, String line)
public void writeRawData(Job job, int pid, Serializable data)
public void printStatusLine(Job job, int pid, String line)
public void processExited(Job job, int pid)
public void jobExited(Job job)
public void processRolledBack(Job job, int pid, int superstep)
public void jobDiedOnError(Job job, int pid, Throwable cause)
public void jobAborted(Job job, int pid, Throwable cause)
public void jobListChanged()

To keep track of pending, running, and finished jobs and to kill running or dispose finished jobs, the following methods of a Consumer instance be can used:

public synchronized Job[] getWaitingJobs()
public synchronized Job[] getActiveJobs()
public synchronized Job[] getFinishedJobs()
public synchronized void updateJobList() throws PpException, NotConnectedException
public synchronized void killJob(Job job) throws PpException, IntegrityException, InternalException, NotConnectedException
public synchronized void disposeJob(Job job) throws IntegrityException

The following minimalistic example shows how to run a BSP program using the interoperability interface:

import padrmi.*;
import pubweb.*;
import pubweb.user.*;

public class IopExample implements ConsumerEventListener {
    
    private Consumer consumer;
    
    public IopExample() {
        try {
            consumer = new Consumer("/path-to/config-file.conf");
            consumer.addConsumerEventListener(this);
            consumer.newJob("Example", 16, "myPackage.MyBspProgram", null);
        } catch (ClassNotFoundException e) {
            System.err.println("BSP program not found: " + e.getMessage());
        } catch (IntegrityException e) {
            System.err.println("integrity violated: " + e.getMessage());
        } catch (NotConnectedException e) {
            System.err.println("no supernode connection available: " + e.getMessage());
        } catch (NotEnoughWorkersException e) {
            System.err.println("not enough peers available: " + e.getMessage());
        } catch (PpAuthorizationException e) {
            System.err.println("authentication failed: " + e.getMessage());
        } catch (Exception e) {
            System.err.println("operation failed:");
            e.printStackTrace();
        }
    }

    public void printToStdOut(Job job, int pid, String line) {
        System.out.println(job + "[" + pid + "]: " + line);
    }

    public void printToStdErr(Job job, int pid, String line) {
        System.err.println(job + "[" + pid + "]: " + line);
    }

    public void writeRawData(Job job, int pid, Serializable data) {
        System.out.println(job + "[" + pid + "]: data: " + data);
    }

    public void printStatusLine(Job job, int pid, String line) {
        System.out.println(job + "[" + pid + "]: status: " + line);
    }

    public void processExited(Job job, int pid) {
        System.out.println("process " + job + "[" + pid + "] completed");
    }

    public void jobExited(Job job) {
        System.out.println("job " + job + " completed");
    }

    public void processRolledBack(Job job, int pid, int superstep) {
        System.out.println("process " + job + "[" + pid + "] restored in superstep " + superstep);
    }

    public void jobDiedOnError(Job job, int pid, Throwable cause) {
        System.err.println("job " + job + " crashed at pid " + pid + " because of:" + cause.getMessage());
        try { consumer.killJob(job); } catch (Exception any) {}
    }

    public void jobAborted(Job job, int pid, Throwable cause) {
        System.err.println("job " + job + " aborted at pid " + pid + " because of:" + cause.getMessage());
        try { consumer.killJob(job); } catch (Exception any) {}
    }

    public void jobListChanged() {
    }
}

to the top

Index A – Z | Imprint