|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object org.lightwolf.Process
public class Process
An unit of work composed by a set of related flows. A process is a manageable unit of work composed by one or more flows. It can be monitored, paused or interrupted. If certain conditions are met (such as using only primitives and serializable objects), a process can also be serialized and deserialized, which allows development of long running processes in pure Java language.
This class contains utilities for communication and synchronization between process flows:
wait
and notify
methods can be used to wait-for and get/provide information about
specific conditions and events.send
and receive
methods allow safe delivery of one-way messages.call
and serve
methods provide a simple request-response mechanism.Flow.joinProcess(Process)
.
This class can be subclassed. If the implementor wish to provide support for
long-running-process, the methods storeData(Object)
,
loadData()
and discardData()
must all be implemented.
Flow
,
Serialized FormField Summary | |
---|---|
static int |
ACTIVE
A constant indicating that the process is active. |
static int |
PASSIVE
A constant indicating that the process is passive. |
protected int |
state
|
Constructor Summary | |
---|---|
Process()
Creates a new process. |
|
Process(ProcessManager manager)
|
Method Summary | |
---|---|
static Connection |
accept(Object matcher)
|
static Connection |
acceptMany(Object matcher)
|
void |
activate()
Reloads this process and all its flows from external storage back to memory. |
int |
activeFlows()
The number of active flows in this process. |
boolean |
addEventListener(IProcessListener listener)
Adds an event listener to this process. |
static Object |
call(Object address,
Object message)
Sends a request to the informed address and waits for a response. |
static Connection |
connect(Object matcher)
|
static Connection |
connectMany(Object matcher)
|
static Process |
current()
Returns the current process. |
protected void |
discardData()
Called to indicate that the stored data is not necessary anymore. |
Flow[] |
getFlows()
|
int |
getState()
Return an int whose value represents this process state. |
protected Object |
loadData()
Called by activate() to retrieve data stored by
storeData(Object) . |
protected void |
notify(int event,
Flow flow)
|
static void |
notifyAll(Object key,
Object message)
Wakes-up all flows awaiting for the specified key. |
boolean |
passivate()
Stores this process and all its flows on a place outside memory. |
static Object |
receive(Object address)
Listens for a single message sent to the informed address. |
static Object |
receiveMany(Object address)
Listens for multiple messages sent to the informed address. |
boolean |
removeEventListener(IProcessListener listener)
Removes an event listener from this process. |
static Process |
safeCurrent()
Returns the current process, or throws an exception if there is no current process. |
static void |
send(Object address,
Object message)
Sends a message to the informed address. |
static IRequest |
serve(Object address)
Listens for a single request sent to the informed address. |
static IRequest |
serveMany(Object address)
Listens for multiple messages sent to the informed address. |
protected static String |
stateName(int state)
Return the name of the specified state; provided for debugging and diagnostic purposes. |
protected void |
storeData(Object data)
Called by passivate() to store data on some media. |
int |
suspendedFlows()
The number of suspended flows in this process. |
static Object |
wait(Object key)
Waits for a notification that matches the informed key. |
static Object |
waitMany(Object key)
Waits for multiple notifications that matches the informed key. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final int ACTIVE
getState()
,
activate()
,
Constant Field Valuespublic static final int PASSIVE
Process
subclass. It can't run flows nor allow new flows to
join it.
getState()
,
passivate()
,
Constant Field Valuesprotected int state
Constructor Detail |
---|
public Process()
Flow.joinProcess(Process)
specifying this process.
public Process(ProcessManager manager)
Method Detail |
---|
protected static String stateName(int state)
state
- The state to get the name from.
null
.public static Process current()
null
. If there is no current flow, this
method also returns null
.
null
if the current flow is
not running in the context of a process, or if there is no
current flow.public static Process safeCurrent()
current()
, except in
that it never returns null
. In the absence of a current
process, it will throw an exception.
IllegalStateException
- If there is no current process.public static Object wait(Object key)
notifyAll(Object, Object)
.
The specified object is used to link wait
and
notify
pairs. For example, the invocation
Object result = Process.wait("ABC");will wait until another flow invokes
Process.notifyAll("ABC", "ResultOfABC");, which will cause the first code to resume and assign
"ResultOfABC"
to the variable result
.
If the key
argument is not null
, it must
provide consistent behaviors for Object.equals(Object)
and
Object.hashCode()
. While not an absolute requirement, it is
strongly recommended to use an immutable object as the key
.
Optionally, the key
argument can be an instance of
IMatcher
. In this case, the matcher will not behave as a key, but
as a selector. The following example illustrates this behavior:
IMatcher matcher = new IMatcher() { boolean match(Object candidate) { if (!(candidate instanceof String)) { return false; } return ((String) candidate).startsWith("ABC"); } }; Object result = Process.wait(matcher);The wait in the above example will resume for keys such as
"ABCD"
or "ABC123"
.
While waiting, the current flow will be suspended and thus will not
consume any thread. If such flow is resumed by means other than
notifyAll(Object, Object)
, the effect is unpredictable and the
process manager will be corrupt.
key
- The key to wait for (may be null
), or an
IMatcher
instance, as above specified.
message
argument that was passed to
notifyAll(Object, Object)
.
IllegalStateException
- If there is no current process.waitMany(Object)
,
notifyAll(Object, Object)
,
send(Object, Object)
,
receive(Object)
public static Object waitMany(Object key)
wait(Object)
, except in that it may return
multiple times and to concurrent flows. Every subsequent call to
notifyAll(Object, Object)
with a key that matches the informed
key will cause this method to return. Whenever this method returns, it
will be on a new flow. It never returns to the invoker's flow.
key
- The key to wait for (may be null
), or an
IMatcher
instance, as specified on wait(Object)
.
message
argument that was passed to
notifyAll(Object, Object)
.
IllegalStateException
- If there is no current process.wait(Object)
,
notifyAll(Object, Object)
public static void notifyAll(Object key, Object message)
wait(Object)
and
waitMany(Object)
, that matches the informed key, to return. The
informed message is returned in such invocations. If more than one flow
is resumed, they all get the same message instance, so either the message
must be immutable, or adequate synchronization must be used. If there is
no flow awaiting for the specified key, invoking this method has no
effect. For examples and more information, see the wait(Object)
method.
key
- The key that identifies which wait(Object)
and
waitMany(Object)
invocations will be resumed.message
- The message to be sent to the resumed flows. It will be
returned from the resumed wait(Object)
and
waitMany(Object)
invocations.
IllegalStateException
- If there is no current process.wait(Object)
,
waitMany(Object)
,
send(Object, Object)
,
receive(Object)
public static void send(Object address, Object message)
The informed address is not a network address. It is an object used to link the sender and receiver flows. For example, the invocation
Process.send("ABC", "MessageForABC");will send the object "MessageForABC" to a flow that invokes
Object result = Process.receive("ABC");The above
receive
invocation will assign "MessageForABC" to
the result
variable.
If the address
argument is not null
, it must
provide consistent behaviors for Object.equals(Object)
and
Object.hashCode()
. While not an absolute requirement, it is
strongly recommended to use an immutable object as the
address
.
While waiting, the current flow will be suspended and thus will not
consume any thread. If such flow is resumed by means other than a
receive
method, the effect is unpredictable and the process
manager will be corrupt.
address
- The address that identifies the listening flow.message
- The message to be sent.
IllegalStateException
- If there is no current process.receive(Object)
,
serve(Object)
public static Object receive(Object address)
If another flow sends a message using call(Object, Object)
, this
method will return an instance of IRequest
that contains the sent
message. Such call will be blocked until invocation of
IRequest.response(Object)
.
This method binds the current flow to the informed address. An address can have at most one flow bound to it. When this method returns (that is, when the message is received), the address will be free again.
While waiting, the current flow will be suspended and thus will not
consume any thread. If such flow is resumed by means other than a
send
method, the effect is unpredictable and the process
manager will be corrupt.
For examples and more information, see the send(Object, Object)
method.
address
- The address on which this flow will be listening.
IRequest
if the message was sent
via call(Object, Object)
.
AddressAlreadyInUseException
- If another flow is listening on this
address.
IllegalStateException
- If there is no current process.send(Object, Object)
,
serve(Object)
public static Object receiveMany(Object address)
receive(Object)
, except in that it may return
multiple times and to concurrent flows. For example, every subsequent
call to a send
method with the informed
address will cause this method to return. Whenever this method returns,
it will be on a new flow. It never returns to the invoker's flow.
The informed address will be bound to the invoker's flow until the current process finishes.
address
- The address on which this flow will be listening.
IRequest
if the message was sent
via call(Object, Object)
.
AddressAlreadyInUseException
- If another flow is listening on this
address.
IllegalStateException
- If there is no current process.receive(Object)
,
serveMany(Object)
public static IRequest serve(Object address)
If another flow sends a message using send(Object, Object)
, this
method will cause such invocation to return, and then it will return an
instance of IRequest
that contains the sent message and requires
no response.
This method binds the current flow to the informed address. An address can have at most one flow bound to it. When this method returns (that is, when the message is received), the address will be free again.
While waiting, the current flow will be suspended and thus will not
consume any thread. If such flow is resumed by means other than a
send
method, the effect is unpredictable and the process
manager will be corrupt.
For examples and more information, see the call(Object, Object)
method.
address
- The address on which this flow will be listening.
IRequest
containing the sent message.
AddressAlreadyInUseException
- If another flow is listening on this
address.
IllegalStateException
- If there is no current process.IRequest
,
call(Object, Object)
,
serveMany(Object)
public static IRequest serveMany(Object address)
serve(Object)
, except in that it may return
multiple times and to concurrent flows. For example, every subsequent
invocation to call(Object, Object)
with the informed address
will cause this method to return. Whenever this method returns, it will
be on a new flow. It never returns to the invoker's flow.
The informed address will be bound to the invoker's flow until the current process finishes.
This method can be used to implement a very simple server.
address
- The address on which this flow will be listening.
IRequest
containing the sent message.
AddressAlreadyInUseException
- If another flow is listening on this
address.
IllegalStateException
- If there is no current process.serve(Object)
public static Object call(Object address, Object message)
send(Object, Object)
, except in that it
waits for a response from the listening flow. While waiting, the
current flow is
suspended.
The following example illustrates the call behavior:
public class Example implements Runnable {
@FlowMethod
public void run() {
// We must join a process.
Flow.joinProcess(new Process());
if (Flow.split(1) == 0) {
// Here is the server.
IRequest request = Process.serve("PeerA");
System.out.println("Request: " + request.request());
request.response("I'm fine.");
} else {
// Here is the client.
Object response = Process.call("PeerA", "How are you?");
System.out.println("Response: " + response);
}
}
public static void main(String[] args) throws InterruptedException {
Flow.submit(new Example());
Thread.sleep(1000); // Wait for all flows to finish.
}
}
The above class prints the following:
Request: How are you? Response: I'm fine.If the
address
argument is not null
, it must
provide consistent behaviors for Object.equals(Object)
and
Object.hashCode()
. While not an absolute requirement, it is
strongly recommended to use an immutable object as the
address
.
While waiting, the current flow will be suspended and thus will not
consume any thread. If such flow is resumed by means other than a
receive
method, the effect is unpredictable and the process
manager will be corrupt.
address
- The address that identifies the listening flow.message
- The message to be sent.
IllegalStateException
- If there is no current process.receive(Object)
,
serve(Object)
public static Connection accept(Object matcher)
public static Connection acceptMany(Object matcher)
public static Connection connect(Object matcher)
public static Connection connectMany(Object matcher)
public final int getState()
int
whose value represents this process state.
ACTIVE
,
PASSIVE
public final int activeFlows()
public final int suspendedFlows()
public final Flow[] getFlows()
public final boolean addEventListener(IProcessListener listener)
listener
- The listener to be added. Must not be null
otherwise a NullPointerException
is thrown.
true
if the listener was added, false
if the informed listener was added this invocation.IProcessListener
,
removeEventListener(IProcessListener)
public final boolean removeEventListener(IProcessListener listener)
listener
- The listener to be removed. Must not be null
otherwise a NullPointerException
is thrown.
true
if the listener was removed, false
if the informed listener was not found in the internal list of
listeners.addEventListener(IProcessListener)
protected void notify(int event, Flow flow)
public final boolean passivate() throws IOException
Process
.
If all of this process flows are suspended, this method stores all flow
data on an external storage and then returns true
,
indicating that the process was passivated. If this process was already
passive when this method is called, it simply returns true
doing nothing.
If the task of storing process data on the external storage fails, this
method throws IOException
and the process is kept active.
If one or more of this process flows is active, the process is kept active and this method returns
false
.
true
if the process was passivated,
false
otherwise.
IOException
- If some error happens during access to storage.activate()
,
ACTIVE
,
PASSIVE
public final void activate() throws IOException, ClassNotFoundException
passivate()
.
IOException
- If some error happens during access to storage.
ClassNotFoundException
- If the class is notpassivate()
,
ACTIVE
,
PASSIVE
protected void storeData(Object data) throws IOException
passivate()
to store data on some media. The default
implementation throws IllegalStateException
indicating that the
process cannot be passivated.
This method must be implemented along with loadData()
and
discardData()
by subclasses that support long-running-processes.
The implementation must associate the informed data with this process.
This method might be called more than once. Any data stored by the
previous invocation to this method must be discarded.
This method typically serializes the informed data
object,
so that an unserialized version is returned by the next call to
loadData()
.
data
- A serializable object to be stored on some media.
IOException
- If some error happens during access to storage.protected Object loadData() throws IOException, ClassNotFoundException
activate()
to retrieve data stored by
storeData(Object)
. The default implementation throws
AssertionError
.
This method must be implemented along with storeData(Object)
and
discardData()
by subclasses that support long-running-processes.
This method must not discard data from the storage.
IOException
- If some error happens during access to storage.
ClassNotFoundException
- If the system can't find a class for a
serialized object while assembling the result.protected void discardData() throws IOException
AssertionError
.
This method must be implemented along with storeData(Object)
and
loadData()
by subclasses that support long-running-processes.
This method must simply discard data from the storage.
IOException
- If some error happens during the storage access.
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |