Building an HTTP server engine using JBoss Netty and using Apache Web Server for load-balancing with sticky-sessions
JBoss Netty has been my first choice for building a scalable HTTP server engine sans Servlet API and Containers, ever since I first discovered it around 1.5 years back. Its USP is its simplicity combined with extreme performance. It is primarily an NIO framework which provides ready-to-use HTTP Protocol decoder and encoder along with a basic API for HTTP request and response, with which we can very simply and quickly build a scalable HTTP server. The performance test results and testimonials are proof of its excellence, for various protocols, including HTTP, vis-à-vis other NIO frameworks as well as HTTP Servlet containers like Jetty and Tomcat.
The documentation is very comprehensive and intuitive with a number of examples.
Here, I shall show a simple example of creating HTTP server with JBoss Netty which creates and provides a session identity to the user. When the user connects by providing the session identity as a URL parameter and without the name, it welcomes the user with its name. We will use this simple server as an example to do reverse proxy and then load balancing with sticky-sessions with Apache Web Server’s mod_proxy and mod_proxy_balancer modules.
First, we need to create a handler, much like a Servlet that would receive HTTP requests. HTTP requests can be chunked and would have to be combined at the server side before processing. This chunking is not typical of high-bandwidth Internet requests, but is very much possible with GPRS medium. So first, we have an abstract template handler for receiving HTTP requests.
package apachetest;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpRequest;
@ChannelPipelineCoverage("one")
public abstract class AbstractHttpHandler
extends SimpleChannelUpstreamHandler {
protected HttpRequest currentRequest;
public static ChannelGroup allChannels = new DefaultChannelGroup
("HttpServer");
@Override
public void messageReceived(ChannelHandlerContext ctx,
MessageEvent e)
throws Exception {
Object message = e.getMessage();
Channel userChannel = e.getChannel();
boolean processMessage = false;
if (message instanceof HttpChunk) {
HttpChunk httpChunk = (HttpChunk) message;
if (currentRequest==null)
throw new
IllegalStateException("No chunk start");
ChannelBuffer channelBuffer = currentRequest
.getContent();
if (channelBuffer==null)
throw new
IllegalStateException("No chunk start");
ChannelBuffer compositeBuffer =
ChannelBuffers.wrappedBuffer(channelBuffer,
httpChunk.getContent());
currentRequest.setContent(compositeBuffer);
processMessage = httpChunk.isLast();
} else if (message instanceof HttpRequest){
currentRequest = (HttpRequest) message;
processMessage = !currentRequest.isChunked();
}
if (processMessage) {
handleRequest(currentRequest, userChannel);
}
}
protected abstract void handleRequest(HttpRequest request,
Channel userChannel) throws Exception;
@Override
public void channelOpen(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
allChannels.add(e.getChannel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
The above AbstractHttpHandler extends JBoss Netty’s SimpleChannelUpstreamHandler and extends its messageReceived method to assemble HTTP chunks, if required to a complete HTTP Request by simply appending the content body. Once a complete HTTP Request is available, it calls the abstract handleRequest method, whose implementation is left to base classes, with the HttpRequest and the channel from which the request was obtained. The ChannelPipelineCoverage is kept as one so that for every channel connection, a new handler will be assigned. This helps us to ensure that we are dealing with a single channel in every handler and so we can safely assemble their chunked requests.
When a new channel (socket) is opened, it is added to an allChannels group. This group enables to close all client and server sockets gracefully during shutdown with a single method call. We will see this in more detail shortly in another code snippet.
Now, we will create an implementation of this AbstractHttpHandler which assigns unique sessions to users and later identifies the users from their sessions.
package apachetest;
import java.util.List;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@ChannelPipelineCoverage("one")
public class SimpleHttpHandler extends AbstractHttpHandler {
private String serverInstanceId;
private String sessionInstanceSeparator;
private static ThreadLocal sessionIdBufLocal
= new ThreadLocal(){
@Override
public StringBuffer get() {
StringBuffer strBuf=super.get();
strBuf.setLength(0);
return strBuf;
}
@Override
protected StringBuffer initialValue() {
return new StringBuffer(50);
}
};
public SimpleHttpHandler(String serverInstanceId,
String sessionInstanceSeparator) {
this.serverInstanceId = serverInstanceId;
this.sessionInstanceSeparator = sessionInstanceSeparator;
}
@Override
protected void handleRequest(HttpRequest request,
Channel userChannel) throws Exception {
String requestUri = request.getUri();
QueryStringDecoder queryStringDecoder = new
QueryStringDecoder(requestUri);
Map> params =
queryStringDecoder.getParameters();
String sessionId = null;
int indexOf = requestUri.indexOf("sessionTest");
if (indexOf==-1) {
sendResponse(request, userChannel,
HttpResponseStatus.NOT_FOUND,
"Valid URI supported is /sessionTest");
return;
}
List sessionList = params.get("jsessionid");
if (sessionList!=null && !sessionList.isEmpty()) {
String sessionInList = sessionList.get(0);
if (sessionInList!=null &&
!"".equals(sessionInList.trim())) {
sessionId = sessionInList;
}
}
List userList = params.get("user");
String user = null;
if (userList!=null && !userList.isEmpty()) {
user = userList.get(0);
}
if (sessionId==null) {
if (user==null || "".equals(user)) {
sendResponse(request, userChannel,
HttpResponseStatus.UNUATHORIZED,
"No valid session, please login with user");
return;
}
UserSessionDetails userSessionDetails =
SessionIdHolder.userIdToUserMap.get(user);
if (userSessionDetails==null) {
userSessionDetails = new UserSessionDetails();
userSessionDetails.userChannel = userChannel;
userSessionDetails.userId = user;
SessionIdHolder.userIdToUserMap
.putIfAbsent(user, userSessionDetails);
userSessionDetails =
SessionIdHolder.userIdToUserMap.get(user);
}
synchronized(userSessionDetails) {
if (userSessionDetails.sessionId!=null) {
SessionIdHolder.sessionIdToUserMap.
remove(userSessionDetails.sessionId);
}
sessionId = userSessionDetails.sessionId =
IdGenerator.generateUniqueId();
userSessionDetails.sessionCreationTime =
System.currentTimeMillis();
userSessionDetails.lastAccessTime =
System.currentTimeMillis();
SessionIdHolder.sessionIdToUserMap.put(sessionId,
userSessionDetails);
SessionIdHolder.userIdToUserMap.put(user,
userSessionDetails);
}
StringBuffer sessionIdToReturn =
sessionIdBufLocal.get();
sessionIdToReturn.append(sessionId);
if (serverInstanceId!=null &&
!"".equals(serverInstanceId)) {
sessionIdToReturn.append(
sessionInstanceSeparator)
.append(serverInstanceId);
}
sendResponse(request,userChannel,
HttpResponseStatus.OK,
"Session Id is "+sessionIdToReturn);
} else {
//strip off the identity of the server
int serverIdIndex =
sessionId.indexOf(sessionInstanceSeparator);
if (serverIdIndex>=0)
sessionId = sessionId.substring(0,
serverIdIndex);
UserSessionDetails userSessionDetails =
SessionIdHolder.sessionIdToUserMap
.get(sessionId);
if (userSessionDetails==null) {
sendResponse(request, userChannel,
HttpResponseStatus.UNUATHORIZED,
"No valid session, please login with user");
return;
}
synchronized(userSessionDetails) {
userSessionDetails.userChannel = userChannel;
userSessionDetails.lastAccessTime =
System.currentTimeMillis();
UserSessionDetails userDetails =
SessionIdHolder.userIdToUserMap
.get(userSessionDetails.userId);
if (userDetails==null ||
userDetails!=userSessionDetails) {
SessionIdHolder.userIdToUserMap
.put(userSessionDetails.userId,
userSessionDetails);
}
}
sendResponse(request,userChannel,
HttpResponseStatus.OK,
"Welcome user "+userSessionDetails.userId
+" to server "+serverInstanceId);
}
}
private void sendResponse(HttpRequest request,
Channel userChannel,
HttpResponseStatus responseStatus, String responseBody)
throws Exception {
boolean close =
HttpHeaders.Values.CLOSE
.equalsIgnoreCase(request
.getHeader(HttpHeaders.Names.CONNECTION)) ||
request.getProtocolVersion()
.equals(HttpVersion.HTTP_1_0) &&
!HttpHeaders.Values.KEEP_ALIVE
.equalsIgnoreCase(request
.getHeader(HttpHeaders.Names.CONNECTION));
HttpResponse response = new DefaultHttpResponse
(HttpVersion.HTTP_1_1, responseStatus);
response.setHeader(HttpHeaders.Names.CACHE_CONTROL,
"no-store");
response.setHeader(HttpHeaders.Names.PRAGMA, "no-cache");
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
""+((responseBody==null)?0:responseBody.length()));
byte[] array = responseBody.getBytes("UTF8");
if (responseBody!=null)
response.setContent(
ChannelBuffers.wrappedBuffer(array));
ChannelFuture future = userChannel.write(response);
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
The SimpleHttpHandler is created with a server instance id and a session instance separator. Each server is provided a unique identity in a cluster of servers. Sessions created for users would be given a session id with the following format - . Thus, if the server instance name is ‘netty1’ and session instance separator is ‘.’ and the unique id created is ‘2C855D9FB0710111’, then the session id given to the user would be the concatenated value ‘2C855D9FB0710111.netty1’. This would help Apache Web Server to maintain sticky sessions, as it would be able to parse the session id and find that it has been created by netty1 instance and all the next requests with this session id must be passed to this instance.
In our handleRequest implementation, we simply generate a new session id for every new user using our own IdGenerator. Notice how we can use the request.getUri() to check that the requests have been made to the expected URI which is ‘/sessionTest’. The QueryStringDecoder from the Netty API helps in extracting request parameters as name-value(s) map. If the request provides the username, we simply create a new session id and store the user session details in our local memory map in SessionIdHolder. We send the response to the user – ‘Session ID is <…..>’. In the next request, the user can copy this session id from the response and send it as a request parameter. Then the server extracts the user session details mapped to the session id, after stripping off the server instance id, and finds out the corresponding user name. It sends the response ‘Welcome user <….> to server <…>’. If it does not find the session in its map, it returns the response ‘No valid session, please login with user’.
The sessions can be shared across servers using replicated caching or a shared database. However, to keep our example simple, and to illustrate load balancing with session stickiness, we have not replicated the sessions.
The sendResponse method sends a response to the user channel from which the request was received. It sets the desired response status – 200 for OK, 403 for UNAUTHORIZED. The HttpResponseStatus from Netty API provides all the HTTP response constants. We also set cache-control and pragma headers to ensure that responses are not cached by the client browser. We also set the content-length header. If the request had a CLOSE header or was using HTTP1.0 protocol with no KEEP_ALIVE header, we close the client socket channel after providing the response. If not, the same socket channel will be used for multiple HTTP request-response cycles between the client and server.
Now that we have the handlers ready, we write the main class to start the HTTP server.
package apachetest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
public class SessionTestServer {
public static void main(String[] args) {
int port = 9080;
String[] arguments = new String[] {"","."};
if (args.length>0 && !"".equals(args[0])
&& args[0]!=null) {
arguments[0] = args[0];
}
if (args.length>1 && !"".equals(args[1])
&& args[1]!=null) {
arguments[1] = args[1];
}
if (args.length>2 && !"".equals(args[2])
&& args[2]!=null) {
try {
int portGiven = Integer.parseInt(args[2]);
if (portGiven<=1024 || portGiven>9999);
else
port = portGiven;
} catch (Exception e) { }
}
String serverInstanceId = arguments[0];
String sessionInstanceSeparator = arguments[1];
NioServerSocketChannelFactory factory = new
NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new
HttpServerPipelineFactory(serverInstanceId,
sessionInstanceSeparator));
Channel serverChannel =
bootstrap.bind(new InetSocketAddress(port));
if (serverChannel==null) {
System.out.println("Unable to open
server at port "+port);
factory.releaseExternalResources();
return;
}
AbstractHttpHandler.allChannels.add(serverChannel);
System.out.println("Started server at port "+port
+" Press any key to stop server");
try {
System.in.read();
} catch (IOException e) {
} finally {
System.out.println("Shutting down server");
ChannelGroupFuture future =
AbstractHttpHandler.allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
}
}
private static class HttpServerPipelineFactory implements
ChannelPipelineFactory {
private String serverInstanceId;
private String sessionInstanceSeparator;
public HttpServerPipelineFactory(String serverInstanceId,
String sessionInstanceSeparator) {
this.serverInstanceId = serverInstanceId;
this.sessionInstanceSeparator =
sessionInstanceSeparator;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder",
new HttpRequestDecoder());
pipeline.addLast("encoder",
new HttpResponseEncoder());
pipeline.addLast("handler", new
SimpleHttpHandler(serverInstanceId,
sessionInstanceSeparator));
return pipeline;
}
}
}
The program accepts the server instance id, the session separator and the server port number to bind for accepting client requests in the command-line arguments. These arguments are optional, the default value for session separator is ‘.’ and the server port number is 9080.
Like any other server-side implementation using Netty, we create a NioSocketChannelFactory and bootstrap the server with our ChannelPipelineFactory implementation and bind it to the acceptor port. We also add the server channel to the allChannels group and wait for user key stroke to stop the server. While stopping the server, we need to simply close the allChannels group and release external resources of our factory, which would stop underlying thread pools.
Now let us look at the ChannelPipelineFactory implementation. For every new client channel that connects to the server, a new pipeline (which is based on interceptor-chain design pattern) is created. The ready-made Netty HttpRequestDecoder and HttpResponseDecoder, and finally our very own SimpleHttpHandler are added to the pipeline in that order.
This is all that is needed to create a simple HTTP server using Netty API.
Now let us start the server using the command line
java apachetest.SessionTestServer netty1 . 9080
This starts a server listening for HTTP requests at port 9080 and with server instance id ‘netty1’ and session id separator as ‘.’ We get the command-line print-
Started server at port 9080 Press any key to stop server
We get back a response on the browser window – ‘Session id is ….’
Now, we copy the session id and change the URL address at the browser to http://localhost:9080/sessionTest?jsessionid=0140C3892B4F5348.netty1
- where the ‘jsessionid’ request parameter value would be the session id which was assigned to us, in the last response.
We now get the response – ‘Welcome user Archanaa to server netty1’
At the console where netty1 process has been started, press any key (like Enter). The server process will stop with the message – ‘Shutting down server’
Now, let us use Apache Web Server for load-balancing two such HTTP servers with session stickiness.
After installing the Apache Web Server, which would listen at port 80 on your machine, go to conf/ directory and make the following changes in httpd.conf configuration file.
- Uncomment the modules mod_proxy, mod_proxy_balancer, mod_proxy_http and mod_rewrite.
- Add the following VirtualHost configuration to the end of the httpd.conf file.
ServerAdmin archanaa.panda@gmail.com
DocumentRoot "C:/apps/Apache Software Foundation/Apache2.2/htdocs"
ProxyRequests Off
ErrorLog logs/test.in-error_log
CustomLog logs/test.in-access_log common
Order deny,allow
Allow from all
ProxyPreserveHost On
ProxyPass /sessionTest/ balancer://mycluster/ stickysession=jsessionid
ProxyPass /sessionTest balancer://mycluster stickysession=jsessionid
Order deny,allow
Allow from all
BalancerMember http://localhost:9080/sessionTest route=netty1
BalancerMember http://localhost:9081/sessionTest route=netty2
SetHandler balancer-manager
Order deny,allow
Allow from all
We intend to start two servers with names netty1 and netty2 at ports 9080 and 9081, respectively. We have specified these URLs and their server instances in the BalancerMember configuration under Proxy tag. In the ProxyPass configuration, we have specified the stickysession as ‘jsessionid’, so that Apache would look for it in the HTTP request parameters or the cookies. We will pass the sessionid in the HTTP request parameter, as we had done earlier. We will choose the session id separator as ‘.’ Because that is the default separator recognized by Apache Web Server.
Now let us start the two server instances like so-
java apachetest.SessionTestServer netty1 . 9080
java apachetest.SessionTestServer netty2 . 9081
After that, let us start the Apache httpd
With this, the Apache Web Server will now act as a reverse-proxy as well as load balancer with sticky-sessions for the two server instances.
We are provided with a user session id –
Similarly, in a different browser window or tab, let us connect with a different user http://localhost/sessionTest?user=Ankur and we would get a new session id.
Now let us change the request to give the jsessionid parameter in the request instead of the user and see the results.
No matter how many times we connect with the same session id and in whatever order, we always get the response from the correct server showing that session stickiness is working properly.
Now let us stop one particular server instance, say netty1 and try to connect its user. Now if we fire a request from our browser with netty1’s session, it would be failed over to the next instance, that is netty2. Since we have not replicated our session, we get the response –
Meanwhile, the other user is able to connect as before successfully with netty2’s session.
Thus we see that our simple experiment has worked well.
There is no limit to how we can use JBoss Netty to build our own HTTP server. In the above example, we have kept the handler synchronous. But there is no restriction for saving the HttpRequest along with a unique asynchronous completion token in a map and doing asynchronous request processing in a different thread. Once the request processing is done, the request can be retrieved from the map and we can send the response as we have done in this simple example above. To add robustness, we can keep a time-to-live for requests and on timeout, remove the corresponding request and send a REQUEST_TIMEOUT response to the client.
With that I conclude this series and I hope I have been able to showcase a fine alternative to using Servlet API and Containers for building simple HTTP servers.