org.lightwolf
Class Process

java.lang.Object
  extended by org.lightwolf.Process
All Implemented Interfaces:
Serializable
Direct Known Subclasses:
FileProcess

public class Process
extends Object
implements Serializable

An unit of work composed by a set of related flows. A process is a manageable unit of work composed by one or more flows. It can be monitored, paused or interrupted. If certain conditions are met (such as using only primitives and serializable objects), a process can also be serialized and deserialized, which allows development of long running processes in pure Java language.

This class contains utilities for communication and synchronization between process flows:

A process is created by calling its constructor. Initially the process contains no flow. A flow can join a process by invoking Flow.joinProcess(Process).

This class can be subclassed. If the implementor wish to provide support for long-running-process, the methods storeData(Object), loadData() and discardData() must all be implemented.

Author:
Fernando Colombo
See Also:
Flow, Serialized Form

Field Summary
static int ACTIVE
          A constant indicating that the process is active.
static int PASSIVE
          A constant indicating that the process is passive.
protected  int state
           
 
Constructor Summary
Process()
          Creates a new process.
Process(ProcessManager manager)
           
 
Method Summary
static Connection accept(Object matcher)
           
static Connection acceptMany(Object matcher)
           
 void activate()
          Reloads this process and all its flows from external storage back to memory.
 int activeFlows()
          The number of active flows in this process.
 boolean addEventListener(IProcessListener listener)
          Adds an event listener to this process.
static Object call(Object address, Object message)
          Sends a request to the informed address and waits for a response.
static Connection connect(Object matcher)
           
static Connection connectMany(Object matcher)
           
static Process current()
          Returns the current process.
protected  void discardData()
          Called to indicate that the stored data is not necessary anymore.
 Flow[] getFlows()
           
 int getState()
          Return an int whose value represents this process state.
protected  Object loadData()
          Called by activate() to retrieve data stored by storeData(Object).
protected  void notify(int event, Flow flow)
           
static void notifyAll(Object key, Object message)
          Wakes-up all flows awaiting for the specified key.
 boolean passivate()
          Stores this process and all its flows on a place outside memory.
static Object receive(Object address)
          Listens for a single message sent to the informed address.
static Object receiveMany(Object address)
          Listens for multiple messages sent to the informed address.
 boolean removeEventListener(IProcessListener listener)
          Removes an event listener from this process.
static Process safeCurrent()
          Returns the current process, or throws an exception if there is no current process.
static void send(Object address, Object message)
          Sends a message to the informed address.
static IRequest serve(Object address)
          Listens for a single request sent to the informed address.
static IRequest serveMany(Object address)
          Listens for multiple messages sent to the informed address.
protected static String stateName(int state)
          Return the name of the specified state; provided for debugging and diagnostic purposes.
protected  void storeData(Object data)
          Called by passivate() to store data on some media.
 int suspendedFlows()
          The number of suspended flows in this process.
static Object wait(Object key)
          Waits for a notification that matches the informed key.
static Object waitMany(Object key)
          Waits for multiple notifications that matches the informed key.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

ACTIVE

public static final int ACTIVE
A constant indicating that the process is active. An active process have all its data on memory. It can run flows and allow new flows to join it.

See Also:
getState(), activate(), Constant Field Values

PASSIVE

public static final int PASSIVE
A constant indicating that the process is passive. A passive process have its data on external storage. The actual storage is defined by the Process subclass. It can't run flows nor allow new flows to join it.

See Also:
getState(), passivate(), Constant Field Values

state

protected int state
Constructor Detail

Process

public Process()
Creates a new process. The new process will belong to the default process manager. The process will contain no flow. To add a flow, it is necessary to call Flow.joinProcess(Process) specifying this process.


Process

public Process(ProcessManager manager)
Method Detail

stateName

protected static String stateName(int state)
Return the name of the specified state; provided for debugging and diagnostic purposes.

Parameters:
state - The state to get the name from.
Returns:
A String containing the state name. Will never be null.

current

public static Process current()
Returns the current process. This method returns non-null if the current flow is running inside a process. Otherwise it returns null. If there is no current flow, this method also returns null.

Returns:
The current process, or null if the current flow is not running in the context of a process, or if there is no current flow.

safeCurrent

public static Process safeCurrent()
Returns the current process, or throws an exception if there is no current process. This method is similar to current(), except in that it never returns null. In the absence of a current process, it will throw an exception.

Returns:
The current process.
Throws:
IllegalStateException - If there is no current process.

wait

public static Object wait(Object key)
Waits for a notification that matches the informed key. The current flow is suspended until another flow invokes notifyAll(Object, Object).

The specified object is used to link wait and notify pairs. For example, the invocation

     Object result = Process.wait("ABC");
 
will wait until another flow invokes
     Process.notifyAll("ABC", "ResultOfABC");
 
, which will cause the first code to resume and assign "ResultOfABC" to the variable result.

If the key argument is not null, it must provide consistent behaviors for Object.equals(Object) and Object.hashCode(). While not an absolute requirement, it is strongly recommended to use an immutable object as the key.

Optionally, the key argument can be an instance of IMatcher. In this case, the matcher will not behave as a key, but as a selector. The following example illustrates this behavior:

     IMatcher matcher = new IMatcher() {
         boolean match(Object candidate) {
             if (!(candidate instanceof String)) { return false; }
             return ((String) candidate).startsWith("ABC");
         }
     };
     Object result = Process.wait(matcher);
 
The wait in the above example will resume for keys such as "ABCD" or "ABC123".

While waiting, the current flow will be suspended and thus will not consume any thread. If such flow is resumed by means other than notifyAll(Object, Object), the effect is unpredictable and the process manager will be corrupt.

Parameters:
key - The key to wait for (may be null), or an IMatcher instance, as above specified.
Returns:
The message argument that was passed to notifyAll(Object, Object).
Throws:
IllegalStateException - If there is no current process.
See Also:
waitMany(Object), notifyAll(Object, Object), send(Object, Object), receive(Object)

waitMany

public static Object waitMany(Object key)
Waits for multiple notifications that matches the informed key. This method is similar to wait(Object), except in that it may return multiple times and to concurrent flows. Every subsequent call to notifyAll(Object, Object) with a key that matches the informed key will cause this method to return. Whenever this method returns, it will be on a new flow. It never returns to the invoker's flow.

Parameters:
key - The key to wait for (may be null), or an IMatcher instance, as specified on wait(Object).
Returns:
The message argument that was passed to notifyAll(Object, Object).
Throws:
IllegalStateException - If there is no current process.
See Also:
wait(Object), notifyAll(Object, Object)

notifyAll

public static void notifyAll(Object key,
                             Object message)
Wakes-up all flows awaiting for the specified key. This method causes all previous invocations to wait(Object) and waitMany(Object), that matches the informed key, to return. The informed message is returned in such invocations. If more than one flow is resumed, they all get the same message instance, so either the message must be immutable, or adequate synchronization must be used. If there is no flow awaiting for the specified key, invoking this method has no effect. For examples and more information, see the wait(Object) method.

Parameters:
key - The key that identifies which wait(Object) and waitMany(Object) invocations will be resumed.
message - The message to be sent to the resumed flows. It will be returned from the resumed wait(Object) and waitMany(Object) invocations.
Throws:
IllegalStateException - If there is no current process.
See Also:
wait(Object), waitMany(Object), send(Object, Object), receive(Object)

send

public static void send(Object address,
                        Object message)
Sends a message to the informed address. If another flow is listening on the informed address, this method causes such flow to resume and then returns immediately. Otherwise, the current flow is suspended until some flow starts listening on the informed address.

The informed address is not a network address. It is an object used to link the sender and receiver flows. For example, the invocation

     Process.send("ABC", "MessageForABC");
 
will send the object "MessageForABC" to a flow that invokes
     Object result = Process.receive("ABC");
 
The above receive invocation will assign "MessageForABC" to the result variable.

If the address argument is not null, it must provide consistent behaviors for Object.equals(Object) and Object.hashCode(). While not an absolute requirement, it is strongly recommended to use an immutable object as the address.

While waiting, the current flow will be suspended and thus will not consume any thread. If such flow is resumed by means other than a receive method, the effect is unpredictable and the process manager will be corrupt.

Parameters:
address - The address that identifies the listening flow.
message - The message to be sent.
Throws:
IllegalStateException - If there is no current process.
See Also:
receive(Object), serve(Object)

receive

public static Object receive(Object address)
Listens for a single message sent to the informed address. If another flow is blocked while sending a message to the informed address, this method causes such flow to resume and then immediately returns the sent message. Otherwise the current flow is suspended until such invocation is issued.

If another flow sends a message using call(Object, Object), this method will return an instance of IRequest that contains the sent message. Such call will be blocked until invocation of IRequest.response(Object).

This method binds the current flow to the informed address. An address can have at most one flow bound to it. When this method returns (that is, when the message is received), the address will be free again.

While waiting, the current flow will be suspended and thus will not consume any thread. If such flow is resumed by means other than a send method, the effect is unpredictable and the process manager will be corrupt.

For examples and more information, see the send(Object, Object) method.

Parameters:
address - The address on which this flow will be listening.
Returns:
The sent message, or an IRequest if the message was sent via call(Object, Object).
Throws:
AddressAlreadyInUseException - If another flow is listening on this address.
IllegalStateException - If there is no current process.
See Also:
send(Object, Object), serve(Object)

receiveMany

public static Object receiveMany(Object address)
Listens for multiple messages sent to the informed address. This method is similar to receive(Object), except in that it may return multiple times and to concurrent flows. For example, every subsequent call to a send method with the informed address will cause this method to return. Whenever this method returns, it will be on a new flow. It never returns to the invoker's flow.

The informed address will be bound to the invoker's flow until the current process finishes.

Parameters:
address - The address on which this flow will be listening.
Returns:
The sent message, or an IRequest if the message was sent via call(Object, Object).
Throws:
AddressAlreadyInUseException - If another flow is listening on this address.
IllegalStateException - If there is no current process.
See Also:
receive(Object), serveMany(Object)

serve

public static IRequest serve(Object address)
Listens for a single request sent to the informed address. If another flow is blocked while sending a message to the informed address, this method causes such flow to resume and then immediately returns a request with the cited message. Otherwise the current flow is suspended until such invocation is issued.

If another flow sends a message using send(Object, Object), this method will cause such invocation to return, and then it will return an instance of IRequest that contains the sent message and requires no response.

This method binds the current flow to the informed address. An address can have at most one flow bound to it. When this method returns (that is, when the message is received), the address will be free again.

While waiting, the current flow will be suspended and thus will not consume any thread. If such flow is resumed by means other than a send method, the effect is unpredictable and the process manager will be corrupt.

For examples and more information, see the call(Object, Object) method.

Parameters:
address - The address on which this flow will be listening.
Returns:
An IRequest containing the sent message.
Throws:
AddressAlreadyInUseException - If another flow is listening on this address.
IllegalStateException - If there is no current process.
See Also:
IRequest, call(Object, Object), serveMany(Object)

serveMany

public static IRequest serveMany(Object address)
Listens for multiple messages sent to the informed address. This method is similar to serve(Object), except in that it may return multiple times and to concurrent flows. For example, every subsequent invocation to call(Object, Object) with the informed address will cause this method to return. Whenever this method returns, it will be on a new flow. It never returns to the invoker's flow.

The informed address will be bound to the invoker's flow until the current process finishes.

This method can be used to implement a very simple server.

Parameters:
address - The address on which this flow will be listening.
Returns:
An IRequest containing the sent message.
Throws:
AddressAlreadyInUseException - If another flow is listening on this address.
IllegalStateException - If there is no current process.
See Also:
serve(Object)

call

public static Object call(Object address,
                          Object message)
Sends a request to the informed address and waits for a response. This method is similar to send(Object, Object), except in that it waits for a response from the listening flow. While waiting, the current flow is suspended.

The following example illustrates the call behavior:

  public class Example implements Runnable {

      @FlowMethod
      public void run() {
          // We must join a process.
          Flow.joinProcess(new Process());
          if (Flow.split(1) == 0) {
              // Here is the server.
              IRequest request = Process.serve("PeerA");
              System.out.println("Request: " + request.request());
              request.response("I'm fine.");
          } else {
              // Here is the client.
              Object response = Process.call("PeerA", "How are you?");
              System.out.println("Response: " + response);
          }
      }

      public static void main(String[] args) throws InterruptedException {
          Flow.submit(new Example());
          Thread.sleep(1000); // Wait for all flows to finish.
      }
  }
 
The above class prints the following:
  Request: How are you?
  Response: I'm fine.
 
If the address argument is not null, it must provide consistent behaviors for Object.equals(Object) and Object.hashCode(). While not an absolute requirement, it is strongly recommended to use an immutable object as the address.

While waiting, the current flow will be suspended and thus will not consume any thread. If such flow is resumed by means other than a receive method, the effect is unpredictable and the process manager will be corrupt.

Parameters:
address - The address that identifies the listening flow.
message - The message to be sent.
Returns:
The listener's response.
Throws:
IllegalStateException - If there is no current process.
See Also:
receive(Object), serve(Object)

accept

public static Connection accept(Object matcher)

acceptMany

public static Connection acceptMany(Object matcher)

connect

public static Connection connect(Object matcher)

connectMany

public static Connection connectMany(Object matcher)

getState

public final int getState()
Return an int whose value represents this process state.

See Also:
ACTIVE, PASSIVE

activeFlows

public final int activeFlows()
The number of active flows in this process. This number can vary quickly as flows are joined, leaved, suspended and resumed.


suspendedFlows

public final int suspendedFlows()
The number of suspended flows in this process. This number can vary quickly as flows are joined, leaved, suspended and resumed.


getFlows

public final Flow[] getFlows()

addEventListener

public final boolean addEventListener(IProcessListener listener)
Adds an event listener to this process.

Parameters:
listener - The listener to be added. Must not be null otherwise a NullPointerException is thrown.
Returns:
true if the listener was added, false if the informed listener was added this invocation.
See Also:
IProcessListener, removeEventListener(IProcessListener)

removeEventListener

public final boolean removeEventListener(IProcessListener listener)
Removes an event listener from this process.

Parameters:
listener - The listener to be removed. Must not be null otherwise a NullPointerException is thrown.
Returns:
true if the listener was removed, false if the informed listener was not found in the internal list of listeners.
See Also:
addEventListener(IProcessListener)

notify

protected void notify(int event,
                      Flow flow)

passivate

public final boolean passivate()
                        throws IOException
Stores this process and all its flows on a place outside memory. The actual storage place is defined by subclasses of Process.

If all of this process flows are suspended, this method stores all flow data on an external storage and then returns true, indicating that the process was passivated. If this process was already passive when this method is called, it simply returns true doing nothing.

If the task of storing process data on the external storage fails, this method throws IOException and the process is kept active.

If one or more of this process flows is active, the process is kept active and this method returns false.

Returns:
true if the process was passivated, false otherwise.
Throws:
IOException - If some error happens during access to storage.
See Also:
activate(), ACTIVE, PASSIVE

activate

public final void activate()
                    throws IOException,
                           ClassNotFoundException
Reloads this process and all its flows from external storage back to memory. If this process is already active, this method does nothing. Otherwise, this method does the opposite of passivate().

Throws:
IOException - If some error happens during access to storage.
ClassNotFoundException - If the class is not
See Also:
passivate(), ACTIVE, PASSIVE

storeData

protected void storeData(Object data)
                  throws IOException
Called by passivate() to store data on some media. The default implementation throws IllegalStateException indicating that the process cannot be passivated.

This method must be implemented along with loadData() and discardData() by subclasses that support long-running-processes. The implementation must associate the informed data with this process. This method might be called more than once. Any data stored by the previous invocation to this method must be discarded.

This method typically serializes the informed data object, so that an unserialized version is returned by the next call to loadData().

Parameters:
data - A serializable object to be stored on some media.
Throws:
IOException - If some error happens during access to storage.

loadData

protected Object loadData()
                   throws IOException,
                          ClassNotFoundException
Called by activate() to retrieve data stored by storeData(Object). The default implementation throws AssertionError.

This method must be implemented along with storeData(Object) and discardData() by subclasses that support long-running-processes. This method must not discard data from the storage.

Returns:
A serializable object that was read from storage.
Throws:
IOException - If some error happens during access to storage.
ClassNotFoundException - If the system can't find a class for a serialized object while assembling the result.

discardData

protected void discardData()
                    throws IOException
Called to indicate that the stored data is not necessary anymore. The default implementation throws AssertionError.

This method must be implemented along with storeData(Object) and loadData() by subclasses that support long-running-processes. This method must simply discard data from the storage.

Throws:
IOException - If some error happens during the storage access.