package apachetest;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpRequest;
@ChannelPipelineCoverage("one")
public abstract class AbstractHttpHandler
extends SimpleChannelUpstreamHandler {
protected HttpRequest currentRequest;
public static ChannelGroup allChannels = new DefaultChannelGroup
("HttpServer");
@Override
public void messageReceived(ChannelHandlerContext ctx,
MessageEvent e)
throws Exception {
Object message = e.getMessage();
Channel userChannel = e.getChannel();
boolean processMessage = false;
if (message instanceof HttpChunk) {
HttpChunk httpChunk = (HttpChunk) message;
if (currentRequest==null)
throw new
IllegalStateException("No chunk start");
ChannelBuffer channelBuffer = currentRequest
.getContent();
if (channelBuffer==null)
throw new
IllegalStateException("No chunk start");
ChannelBuffer compositeBuffer =
ChannelBuffers.wrappedBuffer(channelBuffer,
httpChunk.getContent());
currentRequest.setContent(compositeBuffer);
processMessage = httpChunk.isLast();
} else if (message instanceof HttpRequest){
currentRequest = (HttpRequest) message;
processMessage = !currentRequest.isChunked();
}
if (processMessage) {
handleRequest(currentRequest, userChannel);
}
}
protected abstract void handleRequest(HttpRequest request,
Channel userChannel) throws Exception;
@Override
public void channelOpen(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
allChannels.add(e.getChannel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
package apachetest;
import java.util.List;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@ChannelPipelineCoverage("one")
public class SimpleHttpHandler extends AbstractHttpHandler {
private String serverInstanceId;
private String sessionInstanceSeparator;
private static ThreadLocal sessionIdBufLocal
= new ThreadLocal(){
@Override
public StringBuffer get() {
StringBuffer strBuf=super.get();
strBuf.setLength(0);
return strBuf;
}
@Override
protected StringBuffer initialValue() {
return new StringBuffer(50);
}
};
public SimpleHttpHandler(String serverInstanceId,
String sessionInstanceSeparator) {
this.serverInstanceId = serverInstanceId;
this.sessionInstanceSeparator = sessionInstanceSeparator;
}
@Override
protected void handleRequest(HttpRequest request,
Channel userChannel) throws Exception {
String requestUri = request.getUri();
QueryStringDecoder queryStringDecoder = new
QueryStringDecoder(requestUri);
Map> params =
queryStringDecoder.getParameters();
String sessionId = null;
int indexOf = requestUri.indexOf("sessionTest");
if (indexOf==-1) {
sendResponse(request, userChannel,
HttpResponseStatus.NOT_FOUND,
"Valid URI supported is /sessionTest");
return;
}
List sessionList = params.get("jsessionid");
if (sessionList!=null && !sessionList.isEmpty()) {
String sessionInList = sessionList.get(0);
if (sessionInList!=null &&
!"".equals(sessionInList.trim())) {
sessionId = sessionInList;
}
}
List userList = params.get("user");
String user = null;
if (userList!=null && !userList.isEmpty()) {
user = userList.get(0);
}
if (sessionId==null) {
if (user==null || "".equals(user)) {
sendResponse(request, userChannel,
HttpResponseStatus.UNUATHORIZED,
"No valid session, please login with user");
return;
}
UserSessionDetails userSessionDetails =
SessionIdHolder.userIdToUserMap.get(user);
if (userSessionDetails==null) {
userSessionDetails = new UserSessionDetails();
userSessionDetails.userChannel = userChannel;
userSessionDetails.userId = user;
SessionIdHolder.userIdToUserMap
.putIfAbsent(user, userSessionDetails);
userSessionDetails =
SessionIdHolder.userIdToUserMap.get(user);
}
synchronized(userSessionDetails) {
if (userSessionDetails.sessionId!=null) {
SessionIdHolder.sessionIdToUserMap.
remove(userSessionDetails.sessionId);
}
sessionId = userSessionDetails.sessionId =
IdGenerator.generateUniqueId();
userSessionDetails.sessionCreationTime =
System.currentTimeMillis();
userSessionDetails.lastAccessTime =
System.currentTimeMillis();
SessionIdHolder.sessionIdToUserMap.put(sessionId,
userSessionDetails);
SessionIdHolder.userIdToUserMap.put(user,
userSessionDetails);
}
StringBuffer sessionIdToReturn =
sessionIdBufLocal.get();
sessionIdToReturn.append(sessionId);
if (serverInstanceId!=null &&
!"".equals(serverInstanceId)) {
sessionIdToReturn.append(
sessionInstanceSeparator)
.append(serverInstanceId);
}
sendResponse(request,userChannel,
HttpResponseStatus.OK,
"Session Id is "+sessionIdToReturn);
} else {
//strip off the identity of the server
int serverIdIndex =
sessionId.indexOf(sessionInstanceSeparator);
if (serverIdIndex>=0)
sessionId = sessionId.substring(0,
serverIdIndex);
UserSessionDetails userSessionDetails =
SessionIdHolder.sessionIdToUserMap
.get(sessionId);
if (userSessionDetails==null) {
sendResponse(request, userChannel,
HttpResponseStatus.UNUATHORIZED,
"No valid session, please login with user");
return;
}
synchronized(userSessionDetails) {
userSessionDetails.userChannel = userChannel;
userSessionDetails.lastAccessTime =
System.currentTimeMillis();
UserSessionDetails userDetails =
SessionIdHolder.userIdToUserMap
.get(userSessionDetails.userId);
if (userDetails==null ||
userDetails!=userSessionDetails) {
SessionIdHolder.userIdToUserMap
.put(userSessionDetails.userId,
userSessionDetails);
}
}
sendResponse(request,userChannel,
HttpResponseStatus.OK,
"Welcome user "+userSessionDetails.userId
+" to server "+serverInstanceId);
}
}
private void sendResponse(HttpRequest request,
Channel userChannel,
HttpResponseStatus responseStatus, String responseBody)
throws Exception {
boolean close =
HttpHeaders.Values.CLOSE
.equalsIgnoreCase(request
.getHeader(HttpHeaders.Names.CONNECTION)) ||
request.getProtocolVersion()
.equals(HttpVersion.HTTP_1_0) &&
!HttpHeaders.Values.KEEP_ALIVE
.equalsIgnoreCase(request
.getHeader(HttpHeaders.Names.CONNECTION));
HttpResponse response = new DefaultHttpResponse
(HttpVersion.HTTP_1_1, responseStatus);
response.setHeader(HttpHeaders.Names.CACHE_CONTROL,
"no-store");
response.setHeader(HttpHeaders.Names.PRAGMA, "no-cache");
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
""+((responseBody==null)?0:responseBody.length()));
byte[] array = responseBody.getBytes("UTF8");
if (responseBody!=null)
response.setContent(
ChannelBuffers.wrappedBuffer(array));
ChannelFuture future = userChannel.write(response);
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
In our handleRequest implementation, we simply generate a new session id for every new user using our own IdGenerator. Notice how we can use the request.getUri() to check that the requests have been made to the expected URI which is ‘/sessionTest’. The QueryStringDecoder from the Netty API helps in extracting request parameters as name-value(s) map. If the request provides the username, we simply create a new session id and store the user session details in our local memory map in SessionIdHolder. We send the response to the user – ‘Session ID is <…..>’. In the next request, the user can copy this session id from the response and send it as a request parameter. Then the server extracts the user session details mapped to the session id, after stripping off the server instance id, and finds out the corresponding user name. It sends the response ‘Welcome user <….> to server <…>’. If it does not find the session in its map, it returns the response ‘No valid session, please login with user’.
The sessions can be shared across servers using replicated caching or a shared database. However, to keep our example simple, and to illustrate load balancing with session stickiness, we have not replicated the sessions.
The sendResponse method sends a response to the user channel from which the request was received. It sets the desired response status – 200 for OK, 403 for UNAUTHORIZED. The HttpResponseStatus from Netty API provides all the HTTP response constants. We also set cache-control and pragma headers to ensure that responses are not cached by the client browser. We also set the content-length header. If the request had a CLOSE header or was using HTTP1.0 protocol with no KEEP_ALIVE header, we close the client socket channel after providing the response. If not, the same socket channel will be used for multiple HTTP request-response cycles between the client and server.
Now that we have the handlers ready, we write the main class to start the HTTP server.
package apachetest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
public class SessionTestServer {
public static void main(String[] args) {
int port = 9080;
String[] arguments = new String[] {"","."};
if (args.length>0 && !"".equals(args[0])
&& args[0]!=null) {
arguments[0] = args[0];
}
if (args.length>1 && !"".equals(args[1])
&& args[1]!=null) {
arguments[1] = args[1];
}
if (args.length>2 && !"".equals(args[2])
&& args[2]!=null) {
try {
int portGiven = Integer.parseInt(args[2]);
if (portGiven<=1024 || portGiven>9999);
else
port = portGiven;
} catch (Exception e) { }
}
String serverInstanceId = arguments[0];
String sessionInstanceSeparator = arguments[1];
NioServerSocketChannelFactory factory = new
NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new
HttpServerPipelineFactory(serverInstanceId,
sessionInstanceSeparator));
Channel serverChannel =
bootstrap.bind(new InetSocketAddress(port));
if (serverChannel==null) {
System.out.println("Unable to open
server at port "+port);
factory.releaseExternalResources();
return;
}
AbstractHttpHandler.allChannels.add(serverChannel);
System.out.println("Started server at port "+port
+" Press any key to stop server");
try {
System.in.read();
} catch (IOException e) {
} finally {
System.out.println("Shutting down server");
ChannelGroupFuture future =
AbstractHttpHandler.allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
}
}
private static class HttpServerPipelineFactory implements
ChannelPipelineFactory {
private String serverInstanceId;
private String sessionInstanceSeparator;
public HttpServerPipelineFactory(String serverInstanceId,
String sessionInstanceSeparator) {
this.serverInstanceId = serverInstanceId;
this.sessionInstanceSeparator =
sessionInstanceSeparator;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder",
new HttpRequestDecoder());
pipeline.addLast("encoder",
new HttpResponseEncoder());
pipeline.addLast("handler", new
SimpleHttpHandler(serverInstanceId,
sessionInstanceSeparator));
return pipeline;
}
}
}
java apachetest.SessionTestServer netty1 . 9080
Started server at port 9080 Press any key to stop server
We get back a response on the browser window – ‘Session id is ….’
Now, we copy the session id and change the URL address at the browser to http://localhost:9080/sessionTest?jsessionid=0140C3892B4F5348.netty1
- where the ‘jsessionid’ request parameter value would be the session id which was assigned to us, in the last response.
We now get the response – ‘Welcome user Archanaa to server netty1’
At the console where netty1 process has been started, press any key (like Enter). The server process will stop with the message – ‘Shutting down server’
Now, let us use Apache Web Server for load-balancing two such HTTP servers with session stickiness.
After installing the Apache Web Server, which would listen at port 80 on your machine, go to conf/ directory and make the following changes in httpd.conf configuration file.
- Uncomment the modules mod_proxy, mod_proxy_balancer, mod_proxy_http and mod_rewrite.
- Add the following VirtualHost configuration to the end of the httpd.conf file.
ServerAdmin archanaa.panda@gmail.com
DocumentRoot "C:/apps/Apache Software Foundation/Apache2.2/htdocs"
ProxyRequests Off
ErrorLog logs/test.in-error_log
CustomLog logs/test.in-access_log common
Order deny,allow
Allow from all
ProxyPreserveHost On
ProxyPass /sessionTest/ balancer://mycluster/ stickysession=jsessionid
ProxyPass /sessionTest balancer://mycluster stickysession=jsessionid
Order deny,allow
Allow from all
BalancerMember http://localhost:9080/sessionTest route=netty1
BalancerMember http://localhost:9081/sessionTest route=netty2
SetHandler balancer-manager
Order deny,allow
Allow from all
Now let us start the two server instances like so-
java apachetest.SessionTestServer netty1 . 9080
java apachetest.SessionTestServer netty2 . 9081
Similarly, in a different browser window or tab, let us connect with a different user http://localhost/sessionTest?user=Ankur and we would get a new session id.
Now let us change the request to give the jsessionid parameter in the request instead of the user and see the results.
No matter how many times we connect with the same session id and in whatever order, we always get the response from the correct server showing that session stickiness is working properly.
Now let us stop one particular server instance, say netty1 and try to connect its user. Now if we fire a request from our browser with netty1’s session, it would be failed over to the next instance, that is netty2. Since we have not replicated our session, we get the response –
Meanwhile, the other user is able to connect as before successfully with netty2’s session.
Thus we see that our simple experiment has worked well.
There is no limit to how we can use JBoss Netty to build our own HTTP server. In the above example, we have kept the handler synchronous. But there is no restriction for saving the HttpRequest along with a unique asynchronous completion token in a map and doing asynchronous request processing in a different thread. Once the request processing is done, the request can be retrieved from the map and we can send the response as we have done in this simple example above. To add robustness, we can keep a time-to-live for requests and on timeout, remove the corresponding request and send a REQUEST_TIMEOUT response to the client.
With that I conclude this series and I hope I have been able to showcase a fine alternative to using Servlet API and Containers for building simple HTTP servers.