PUB-Web consists of three different types of Peer-to-Peer nodes: workers that run some threads of parallel programs utilizing the available idle time; supernodes that are required to manage the network, i.e., balance the workload and authenticate peers (in the current implementation exactly one supernode is required per network because the load balancer is not yet implemented in a distributed fashion); and consumer nodes via which users can run their parallel programs. A physical node can play one, two, or all three roles in the PUB-Web network.
Before a worker, consumer, or supernode role of PUB-Web can be started, a few adaptions to the configuration files (included in the binary download bundle) are necessary:
- All occurrancies of __ABSOLUTE_PATH__ in etc/security.policy have to be replaced with the absolute installation path; on Windows systems, a double backslash must be used in path names (because a single backslash would start an escape sequence).
- For a non-localhost setup, the correct IP addresses have to be entered for the padrmi.bind.address property in the configuration files etc/pubweb-worker.conf, etc/pubweb-consumer.conf, and etc/pubweb-supernode.conf. For the worker and consumer nodes, also the IP of the supernode has to be specified via the pubweb.supernode.host property.
- In case some the the machines are located in a NAT'ted subnetwork, port forwarding needs to be configured at the router; the public IP and the port provided by the router for forwarding need to be speficied via the padrmi.address and padrmi.port properties.
- In etc/pubweb-passwd.conf (only at the supernode) user accounts can be setup. These logins and passwords need to be specified at the consumer nodes via the pubweb.username and pubweb.password properties in etc/pubweb-consumer.conf.
- In etc/pubweb-worker.conf the absolute path to the java executable must be specified via the pubweb.process.exec property.
At first, one more adaption in pubweb-consumer.conf is necessary: please add the path, where the .class files of your program will be located, to the padrmi.path property; also .jar files are supported.
We assume that you are already familiar with the Java programming language as well as with the BSP model and basic BSP programming princples. If the latter one is not the case, we recommend the book Parallel Scientific Computation: A Structured Approach using BSP and MPI by Rob H. Bisseling, Oxford University Press, 2004, which is an excellent guide to parallel scientific compuation using BSP.
You may also wish to get inspired by the source code of the examples and the compile scripts in the SVN repository.
As user programs intended to run on PUB-Web have to be BSP programs, the interface BSPProgram must be implemented, i.e., the program must have a method with this signature:
public void bspMain(BSP bspLib, Serializable args) throws AbortedException
Its first parameter is a reference to the PUB-Web runtime environment in order access BSP API methods; the second parameter holds the arguments passed to the BSP program, which is a String[] if the BSP program is started from a command prompt, or any serializable Java object in case the interoperability API (see below) is used.
In order to enable BSP programs to be migrated at runtime by the load balancer, they need to be compiled using the PadMig compiler and need to implement the BSPMigratableProgram interface instead, which means that the main method has this different signature:
@Migratory public void bspMain(BSPMigratable bspLib, Serializable args) throws AbortedException
In the following, we discuss the BSP library functions which can be accessed via the BSP and BSPMigratable interface, respectively, which is implemented by the PUB-Web runtime environment. In non-migratory programs, the barrier synchronization is entered by calling:
public void sync()
The migratory version by default additionally creates a backup copy of the execution state and performs migrations if suggested by the load balancer; you can suppress the migration check and/or creation of a backup copy using the second version of this method (note that, when suppressing backup copies, the BSP program crashes if one process crashes in the current superstep, no matter if there are older backup copies from earlier supersteps):
@Migratory public void syncMig() @Migratory public void syncMig(boolean mayMigrate, boolean createSnapshot)
A message, which can be any serializable Java object, can be sent with these methods; thereby the latter two methods are for broadcasting a message to an interval and an arbitrary subset of the BSP processes, respectively:
public void send(int to, Serializable msg) throws IntegrityException public void send(int pidLow, int pidHigh, Serializable msg) throws IntegrityException public void send(int[] pids, Serializable msg) throws IntegrityException
Messages sent in the previous superstep can be accessed with these methods, where the find* methods are for accessing messages of a specific sender:
public int getNumberOfMessages() public Message getMessage(int index) throws IntegrityException public Message[] getAllMessages() public Message findMessage(int src, int index) throws IntegrityException public Message[] findAllMessages(int src) throws IntegrityException
When receiving a message, it is encapsulated in a Message object. The message itself as well as the sender ID can get obtained with these methods:
public Serializable getContent() public int getSource()
In order to terminate all the processes of a BSP program, e.g. in case of an error, the following method has to be called; the Throwable parameter will be transmitted to the PUB-Web user who has started the program:
public void abort(Throwable cause) throws AbortedException
Any output to stdout or stderr should be printed using the following methods as they display it on the command prompt of the user who has started the BSP program rather than on the computer where the processes are actually running:
public void printStdOut(String line) public void printStdErr(String line)
Beside writing to stdout and stderr, BSP programs can output any serializable Java object, which is especially useful when a BSP program is started using the interoperability API (see below):
public void writeRawData(Serializable data)
By default the output is sent back to the user asynchronously in the background so that these methods immediately return, even if delivering the output takes some time due to network delays. However, in some cases it might be necessary to ensure that the output is delivered before proceeding, e.g., before a program is terminated using the abort() method. This can be achieved using this method:
public void flush()
To access data from files, the following method should be used. In particular, any file in the BSP program folder of the PUB-Web user's peer can be read with it:
public InputStream getResourceAsStream(String name) throws IOException, MalformedURLException
In migratable programs there is also a method available which may be called to mark additional points inside long supersteps where a migration is safe (i.e. no open files etc.):
@Migratory public boolean mayMigrate()
Furthermore, there are some service functions to obtain the number of processes of the BSP program, the own process ID, and so on.
The following example program demonstrates the basic BSP features, especially how to send and receive messages:
import pubweb.*; import pubweb.bsp.*; public class MessagePassing implements BSPProgram { public void bspMain(BSP bsp, Serializable args) throws AbortedException { // calculate neighbours int pid = bsp.getProcessId(); int n = bsp.getNumberOfProcessors(); int left = (pid - 1 + n) % n; int right = (pid + 1) % n; try { bsp.send(left, new Integer(1)); bsp.send(right, new Integer(2)); } catch (IntegrityException ie) { bsp.printStdErr("sending failed: " + ie.getMessage()); } bsp.sync(); // get all my messages, method 1 n = bsp.getNumberOfMessages(); for (int i = 0; i < n; i++) { try { Message msg = bsp.getMessage(i); bsp.printStdOut("got " + msg.getContent() + " from pid " + msg.getSource() + " in superstep " + msg.getSuperstep()); } catch (IntegrityException ie) { bsp.printStdErr("receiving failed: " + ie.getMessage()); } } // get all my messages, method 2 Message[] msgs = bsp.getAllMessages(); bsp.printStdOut("got in total " + msgs.length + " messages"); // get messages from some specified pid, method 1 try { int i = 0; Message msg; while ((msg = bsp.findMessage(0, i++)) != null) { bsp.printStdOut("received " + msg.getContent() + " from pid 0 in superstep " + msg.getSuperstep()); } } catch (IntegrityException ie) { bsp.printStdErr("receiving failed: " + ie.getMessage()); } // get messages from some specified pid, method 2 try { msgs = bsp.findAllMessages(0); bsp.printStdOut("got " + msgs.length + " messages from pid 0"); } catch (IntegrityException ie) { bsp.printStdErr("receiving failed: " + ie.getMessage()); } } }
A BSP program needs not necessarily be launched by a user. Using the interoperability interface, BSP programs can be started out of other stand-alone software just like an (asynchronous) function call: the PUB-Web peer software can be embedded in GUI-less mode into other applications using the interoperability API. Amongst others, the API provides a function to start a BSP program and an interface for callback-functions to receive the output and, possibly, error messages of the parallel program. Parameters passed to the BSP program as well as the output sent back are not restricted to strings, but can be any (serializable) object.
In particular, one first needs to create an instance of a PUB-Web peer in consumer mode:
Consumer consumer = new Consumer("/path-to/config-file.conf")
BSP programs are started using the following method, where desc is the description of the BSP program appearing in the process list, nProcs is the number of requested BSP processes, mainClass is the fully qualified name of the BSP program's main class, and progArgs can any serializable Java object to pass as argument to the BSP program:
public Job newJob(String desc, int nProcs, String mainClass, Serializable progArgs) throws ClassNotFoundException, IntegrityException, InternalException, MalformedURLException, NotConnectedException, NotEnoughWorkersException, PpException
In order to asynchronously receive the output and status updates of the BSP program, an event listener is required, which can be added / removed using these methods:
public void addConsumerEventListener(ConsumerEventListener cel) public void removeConsumerEventListener(ConsumerEventListener cel)
he event handler interface contains methods to receive output and status messages of BSP programs:
public void printToStdOut(Job job, int pid, String line) public void printToStdErr(Job job, int pid, String line) public void writeRawData(Job job, int pid, Serializable data) public void printStatusLine(Job job, int pid, String line) public void processExited(Job job, int pid) public void jobExited(Job job) public void processRolledBack(Job job, int pid, int superstep) public void jobDiedOnError(Job job, int pid, Throwable cause) public void jobAborted(Job job, int pid, Throwable cause) public void jobListChanged()
To keep track of pending, running, and finished jobs and to kill running or dispose finished jobs, the following methods of a Consumer instance be can used:
public synchronized Job[] getWaitingJobs() public synchronized Job[] getActiveJobs() public synchronized Job[] getFinishedJobs() public synchronized void updateJobList() throws PpException, NotConnectedException public synchronized void killJob(Job job) throws PpException, IntegrityException, InternalException, NotConnectedException public synchronized void disposeJob(Job job) throws IntegrityException
The following minimalistic example shows how to run a BSP program using the interoperability interface:
import padrmi.*; import pubweb.*; import pubweb.user.*; public class IopExample implements ConsumerEventListener { private Consumer consumer; public IopExample() { try { consumer = new Consumer("/path-to/config-file.conf"); consumer.addConsumerEventListener(this); consumer.newJob("Example", 16, "myPackage.MyBspProgram", null); } catch (ClassNotFoundException e) { System.err.println("BSP program not found: " + e.getMessage()); } catch (IntegrityException e) { System.err.println("integrity violated: " + e.getMessage()); } catch (NotConnectedException e) { System.err.println("no supernode connection available: " + e.getMessage()); } catch (NotEnoughWorkersException e) { System.err.println("not enough peers available: " + e.getMessage()); } catch (PpAuthorizationException e) { System.err.println("authentication failed: " + e.getMessage()); } catch (Exception e) { System.err.println("operation failed:"); e.printStackTrace(); } } public void printToStdOut(Job job, int pid, String line) { System.out.println(job + "[" + pid + "]: " + line); } public void printToStdErr(Job job, int pid, String line) { System.err.println(job + "[" + pid + "]: " + line); } public void writeRawData(Job job, int pid, Serializable data) { System.out.println(job + "[" + pid + "]: data: " + data); } public void printStatusLine(Job job, int pid, String line) { System.out.println(job + "[" + pid + "]: status: " + line); } public void processExited(Job job, int pid) { System.out.println("process " + job + "[" + pid + "] completed"); } public void jobExited(Job job) { System.out.println("job " + job + " completed"); } public void processRolledBack(Job job, int pid, int superstep) { System.out.println("process " + job + "[" + pid + "] restored in superstep " + superstep); } public void jobDiedOnError(Job job, int pid, Throwable cause) { System.err.println("job " + job + " crashed at pid " + pid + " because of:" + cause.getMessage()); try { consumer.killJob(job); } catch (Exception any) {} } public void jobAborted(Job job, int pid, Throwable cause) { System.err.println("job " + job + " aborted at pid " + pid + " because of:" + cause.getMessage()); try { consumer.killJob(job); } catch (Exception any) {} } public void jobListChanged() { } }