We’ll now conclude the discussion of the NIO package we began in Chapter 12 by talking about nonblocking and selectable network communications. All our server examples in this chapter thus far have used a thread-bound pattern (one thread per I/O operation). In Java, this is very natural because of the ease with which we can create threads. It’s also very efficient, within limits. Problems arise when you try to build very large-scale servers using this style of client handling. While on a large machine it’s certainly possible to have hundreds or even thousands of threads (especially if they’re mostly idle, waiting for I/O), this is a resource-hungry solution. Every thread you start in Java consumes memory for its internal stack, and the performance of managing this number of threads is highly system-dependent.
An alternate approach is to take a lesson from the old, dark days before threading was available and use nonblocking I/O operations to manage numerous communications from a single thread. Better yet, our server uses a configurable pool of threads, taking advantage of machines with many processors.
At the heart of this process is the concept of selectable I/O. It’s not good enough to simply have nonblocking I/O operations if you have no way to efficiently poll for work to be done. The NIO package provides for efficient polling using selectable channels. A selectable channel allows for the registration of a special kind of listener called a selector that can check the readiness of the channel for operations, such as reading and writing or accepting or creating network connections.
The selector and the selection process are not typical Java listeners of the kind we’ll see elsewhere in this book, but instead rather slavishly follow the conventions of C language systems. This is mainly for performance reasons; because this API is primarily intended for high-volume servers, it is bound very tightly to the traditional, underlying operating system facilities with less regard for ease of use. This, combined with the other details of using the NIO package, mean that this section is somewhat dense and the server we create here is one of the longer and more complex examples in the book. Don’t be discouraged if you are a bit put off by this section. You can use the general techniques earlier in this chapter for most applications and reserve this knowledge for creating services that handle the very highest volumes of simultaneous client requests.
A selectable channel implements the SelectableChannel
interface, which specifies that the channel can be set to a nonblocking
mode and that it supports the select API that makes efficient polling
possible. The primary implementations of selectable channels are those
for working with the network: SocketChannel,
ServerSocketChannel,
and DatagramChannel. The
only other selectable channel is the Pipe (which can be used
in an analogous way for intra-VM communication).
At the heart of the process is the Selector object, which knows about a
particular set of selectable channels and provides a select() method for
determining their readiness for I/O operations. Conceptually, the
process is simple; you register one or more channels with a selector and
then poll it, asking it to tell you which set of channels is ready to
go. In actuality, there are a few additional pieces involved.
First, the Selector does not
work directly with channels but instead operates on SelectionKey objects. A
SelectionKey object is created
implicitly when the channel is registered with the Selector. It encapsulates the selectable
channel as well as information about what types of operations (e.g.,
read, write) we are interested in waiting for. That information is held
in the SelectionKey in a set of flags
called the interest set, which can be changed by
the application at any time. SelectionKeys are also used to return the
results of a select operation. Each
call to select() returns the number
of SelectionKeys that are ready for
some type of I/O. The keys are then retrieved with the selectedKeys() method.
Each key also has a set of flags called the ready
set that indicates which operation of interest is actually
ready (possibly more than one). For example, a SelectionKey interest set might indicate that
we want to know when its channel is ready for reading or writing. After
a select operation, if that key is in the set returned by the selector,
we know that it is ready for one or more of those operations, and we can
check the key’s ready set to find out which one.
Before we go on, we should say that although we have been saying that channels are registered with selectors, the API is (confusingly) the other way around. Selectors are actually registered with the one or more channels they manage, but it’s better to mentally spackle over this and think of them the other way around.
A Selector object is
created using the Selector.open() method
(Selector uses a factory
pattern):
Selectorselector=Selector.open();
To register one or more channels with the selector, set them to nonblocking mode:
SelectableChannelchannelA=// ...channelA.configureBlocking(false);
Next, register the channels:
intinterestOps=SelectionKey.OP_READ|SelectionKey.OP_WRITE;SelectionKeykey=channelA.register(selector,interestOps);
When we register the channel, we have an opportunity to set the
initial interest operations (or “interest ops”). These are defined by
constant fields in the SelectionKey
class:
These fields are bit flags; you can logically OR them together as in this example to express interest in more than one type of operation.
The result of the register() method is a
SelectionKey object. We can use the
key to change the interest ops at any time with the SelectionKey interestOps() method or to
unregister the channel from the Selector with the key’s cancel() method.
This same key is also returned as the result of selection
operations when its channel is ready. When the SelectionKey is returned, its ready set holds
flags for the operations that do not block if called. We can retrieve
the value of the flags with the readySet() method.
Convenience methods are available to test for each operation in the
ready set: isReadable(),
isWritable(),
isConnectable(), and
isAcceptable().
Depending on how you structure your application, it may not be
necessary to save the SelectionKey at
registration time. In our example, we let the Selector keep track of
the keys for us, simply using them when they are ready. In fact, we go
even further and put the SelectionKey
to work by asking it to hold a reference for us! The SelectionKey attach() method is a convenience
method that can attach an arbitrary object to the key for use by our
application. We show how this can be useful in a bit.
After one or more channels are registered with the Selector, we can perform a select operation using one of its select()
methods:
intreadyCount=selector.select();
Without arguments, the method blocks until at least one channel is
ready for some operation or until the Selector’s wakeup() method is
called. Alternatively, you can use the form of select() that takes a timeout (in
milliseconds) to wait for a ready channel before returning. There is
also selectNow(), which
always returns immediately. Both of these return methods count the
number of ready channels.
You can use select() and
wakeup() somewhat like wait() and notify(). The wakeup is necessary because once
a selection is started, it will not see any changes to its key’s
interest ops until the next invocation. If another thread changes the
interest ops, it must use wakeup() to
prompt the selecting thread to select() again. The Selector is also heavily synchronized; for
example, calls to register new channels block until the select is
finished. Often it’s much easier to simply use select with a short timeout and a loop, like
this:
while(selector.select(50)==0);
However, if another thread is allowed to change the interest ops,
you still need to use wakeup() to
maximize throughput. Otherwise, in the worst case, you could end up
waiting the full select wait period
on every iteration, even when there is work to be done.
Next, we can get the set of ready channels from the Selector with the selectedKeys() method
and iterate through them, doing whatever our application
dictates:
SetreadySet=selector.selectedKeys();for(Iteratorit=readySet.iterator();it.hasNext();){SelectionKeykey=(SelectionKey)it.next();it.remove();// remove the key from the ready set// use the key}
The ready set is returned to us as a java.util.Set, which we walk through with an
Iterator (see Chapter 1). One important thing to note is that
we’ve used the Iterator’s remove() method to remove the key from the
ready set. The select() methods add
keys only to the ready set or add flags to keys already in the set; they
never remove them, so we must clear the keys when we handle them. You
can get the full set of keys a Selector is managing with the keys() method, but you should not attempt to
remove keys from that set; use the cancel() method on individual keys instead. Or
you can close the entire Selector
with its close() method,
unregistering all its keys.
Let’s put this information to use. In this section, we’ll
create the big brother of TinyHttpd (our minimalist web server)
called LargerHttpd. The LargerHttpd server is a nonblocking web server
that uses SocketChannels and a
pool of threads to service requests. In this example, a single thread
executes a main loop that accepts new connections and checks the
readiness of existing client connections for reading or writing.
Whenever a client needs attention, it places the job in a queue where a
thread from our thread pool waits to service it. As we said, this
example is a bit longer than we would like, but it is really the minimum
that is necessary to show a realistic usage of the APIs:
importjava.io.*;importjava.util.*;importjava.util.concurrent.*;importjava.net.*;importjava.nio.*;importjava.nio.channels.*;importjava.nio.charset.*;importjava.util.regex.*;publicclassLargerHttpd{SelectorclientSelector;publicvoidrun(intport,intthreads)throwsIOException{clientSelector=Selector.open();ServerSocketChannelssc=ServerSocketChannel.open();ssc.configureBlocking(false);InetSocketAddresssa=newInetSocketAddress(InetAddress.getLoopbackAddress(),port);ssc.socket().bind(sa);ssc.register(clientSelector,SelectionKey.OP_ACCEPT);Executorexecutor=Executors.newFixedThreadPool(threads);while(true){try{while(clientSelector.select(100)==0);Set<SelectionKey>readySet=clientSelector.selectedKeys();for(Iterator<SelectionKey>it=readySet.iterator();it.hasNext();){finalSelectionKeykey=it.next();it.remove();if(key.isAcceptable()){acceptClient(ssc);}else{key.interestOps(0);executor.execute(newRunnable(){publicvoidrun(){try{handleClient(key);}catch(IOExceptione){System.out.println(e);}}});}}}catch(IOExceptione){System.out.println(e);}}}voidacceptClient(ServerSocketChannelssc)throwsIOException{SocketChannelclientSocket=ssc.accept();clientSocket.configureBlocking(false);SelectionKeykey=clientSocket.register(clientSelector,SelectionKey.OP_READ);HttpdConnectionclient=newHttpdConnection(clientSocket);key.attach(client);}voidhandleClient(SelectionKeykey)throwsIOException{HttpdConnectionclient=(HttpdConnection)key.attachment();if(key.isReadable()){client.read(key);}else{client.write(key);}clientSelector.wakeup();}publicstaticvoidmain(Stringargv[])throwsIOException{//new LargerHttpd().run( Integer.parseInt(argv[0]), 3/*threads*/ );newLargerHttpd().run(1235,3/*threads*/);}}classHttpdConnection{staticCharsetcharset=Charset.forName("8859_1");staticPatternhttpGetPattern=Pattern.compile("(?s)GET /?(\\S*).*");SocketChannelclientSocket;ByteBufferbuff=ByteBuffer.allocateDirect(64*1024);Stringrequest;Stringresponse;FileChannelfile;intfilePosition;HttpdConnection(SocketChannelclientSocket){this.clientSocket=clientSocket;}voidread(SelectionKeykey)throwsIOException{if(request==null&&(clientSocket.read(buff)==-1||buff.get(buff.position()-1)=='\n'))processRequest(key);elsekey.interestOps(SelectionKey.OP_READ);}voidprocessRequest(SelectionKeykey){buff.flip();request=charset.decode(buff).toString();Matcherget=httpGetPattern.matcher(request);if(get.matches()){request=get.group(1);if(request.endsWith("/")||request.equals(""))request=request+"index.html";System.out.println("Request: "+request);try{file=newFileInputStream(request).getChannel();}catch(FileNotFoundExceptione){response="404 Object Not Found";}}elseresponse="400 Bad Request";if(response!=null){buff.clear();charset.newEncoder().encode(CharBuffer.wrap(response),buff,true);buff.flip();}key.interestOps(SelectionKey.OP_WRITE);}voidwrite(SelectionKeykey)throwsIOException{if(response!=null){clientSocket.write(buff);if(buff.remaining()==0)response=null;}elseif(file!=null){intremaining=(int)file.size()-filePosition;longsent=file.transferTo(filePosition,remaining,clientSocket);if(sent>=remaining||remaining<=0){file.close();file=null;}elsefilePosition+=sent;}if(response==null&&file==null){clientSocket.close();key.cancel();}elsekey.interestOps(SelectionKey.OP_WRITE);}}
From a bird’s-eye view, the structure of LargerHttpd is the same as TinyHttpd. The main class, LargerHttpd, accepts connections, and a
connection class, HttpdConnection,
encapsulates a socket and handles the conversation with the client.
However, this time, instead of each connection object being a Runnable serviced in its own thread, its
functionality is broken into two primary methods called read() and write(). The job of our LargerHttpd is to accept new client socket
connections, wrap them in an instance of HttpdConnection, and then watch the client’s
status with a Selector. Whenever we
detect that a client is ready to send or receive data, we hand off a
Runnable task to our Executor. The task calls read() or write() on the corresponding client, based on
the operation that is is ready.
The HttpConnection object
encapsulates the state of the conversation with the client. Because its
interface is rather coarse, it must keep track of whether it is waiting
to read more input, generate a response, or write file output. The
HttpdConnection also manages the
interest set of its key so that it can effectively schedule itself to be
woken up when it’s ready for reading or writing. The association between
the HttpdConnection and the key is
made by using the key’s attach() and
attachment() methods.
LargerHttpd’s acceptClient() method does several things.
First, it accepts the new socket connection. Next, it configures and
registers it with the selector with an initial interest set for reading.
Finally, it creates the HttpdConnection for the socket, and attaches
the HttpdConnection object to the key
for later retrieval.
The main loop of LargerHttpd is
fairly straightforward. First, we set up the ServerSocketChannel.
This is similar to setting up a plain ServerSocket, except that we must first create
an InetSocketAddress
object to hold the local loopback address and port combination of our
server socket and then explicitly bind our socket to that address with
the ServerSocketChannel bind()
method. We also configure the server socket to nonblocking mode and
register it with our main Selector so
that we can select for client connections in the same loop that we use
to select for client read and write readiness.
In the main select loop, we
check to see whether the key is ready for an accept operation and if so, we call acceptClient(); if not, we set the key’s
interest set to zero with the interestOps() method and dispatch the key to
our handleClient() method via a
Runnable task. It’s important that we
change the interest set to zero to clear it before the next loop;
otherwise, we’d be in a race to see whether the thread pool performed
its maximum work before we detected another ready condition. Setting the
interest ops to 0 and resetting it in the HttpdConnection object upon completion ensures
that only one thread is handling a given client at a time.
For each operation that is ready, we dispatch a task to our
Executor. The task calls handleClient(), passing it the selection key.
From the key, we retrieve the associated HttpdConnection object and call the
appropriate service method based on whether the key is ready for reading
or writing. After that, it’s up to the connection object to do its job.
Each call to the read() method simply
does what would be one iteration of a read loop in a thread-bound
application. Each read gets as much data as available and checks to see
whether we’ve reached the end of a line (a \n newline character). Upon reaching the end
of a line, we dispatch the call to the processRequest() method, which turns the byte
buffer into text and uses the same techniques as our TinyHttpd to parse the request into a file
pathname. On each incomplete call to read(), we set the interest ops of our key
back to OP_READ. Upon completing the
read and processing the request, we switch to using OP_WRITE because we are now ready to send a
response.
The write() method keeps track
of whether it’s sending a text response (error message) or a file by
using the response and file instance variables. When sending a file,
we use the FileChannel’s transferTo() method to
transfer bytes from the file directly to the network socket without
copying them into Java’s memory space. (This is indeed an efficient
little web server.) And that’s about it. When we’re done, we close the
client socket and cancel our key, which causes it to be removed from the
Selector’s key set during the next
select operation (discarding our
HttpdConnection object with
it).
Our example showed SocketChannel used for
nonblocking, selectable I/O in a typical server application. It’s less
common to need nonblocking I/O from a client, but there is certainly no
reason you can’t do it. Perhaps you’re writing a peer-to-peer (P2P)
application that manages many connections from both sides.
For the client side of communications, one additional tool is
provided: a nonblocking socket-connect operation. The process of
creating a TCP connection from the client side involves contacting the
remote host in a two-phase acknowledgment. This process normally blocks
until the connection is established. However, the NIO package provides
an alternative that allows you to initiate the connection and then poll
for its status. When set to nonblocking mode, a call to a SocketChannel’s connect() method returns immediately. The
connection is then attempted (and possibly succeeds or fails) in the
background. Later, a Selector can be
used, checking for the OP_CONNECT
flag to see when the socket is ready to “finish connecting.” The
connection is finished by invoking the SocketChannel’s finishConnect() method, which either returns
or throws an IOException indicating
the failure. The process of finishing the connection is really more
about collecting the results of the asynchronous
connection—acknowledging its success or failure—than about doing
work.