-
December 15th, 2012, 01:55 PM
#1
Java NIO multithreaded synchronized problem
Hello! I'm making a multiclient server in java and and the moment it just echoes back what gets sent to it.
My problem is that when my worker thread(ProtocolWorker, parses packets and sends data) gets hold up(lest say i set a breakpoint in eclipse) and then continues the values on different objects change.
Ill give you an example once i provided some code:
Main.java:read():
Code:
private void read(SelectionKey key) throws IOException {
key.interestOps(SelectionKey.OP_READ);
//Now its just a socket channel since we've already accepted the connection into a socket
SocketChannel socketChannel = (SocketChannel) key.channel();
//The byte buffer into which we'll read data when it's available
ByteBuffer buffer = ByteBuffer.allocate(65536);
//Remove old data
buffer.clear();
//Attempt to read from the channel
int numRead;
try {
numRead = socketChannel.read(buffer);
}catch(IOException e){
//The remote connection has been closed.
//Cancel selection and close the channel.
key.cancel();
socketChannel.close();
return;
}
if(numRead == -1) {//Socket error
//The remote connection has been closed.
//Cancel selection and close the channel.
this.dataMap.remove(socketChannel);
Socket socket = socketChannel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
System.out.println("Connection closed by client: " + remoteAddr);
key.cancel();
socketChannel.close();
return;
}
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
System.out.println("[READ] - Recieved: " + new String(data, "US-ASCII") + " from " + socketChannel.socket().getRemoteSocketAddress());
DataEvent socketData = (DataEvent) this.dataMap.get(socketChannel);
//Synchronized socketData
synchronized(socketData){
System.out.println("Someone entered the Main thread, trying to read");
this.worker.queueData(this, key, data, numRead);
socketData.notify();
}
}
Main.java:write():
Code:
public synchronized void write(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
key.interestOps(SelectionKey.OP_WRITE);
DataEvent pendingData = this.dataMap.get(socketChannel);
//Synchronized pendingData
synchronized(pendingData){
System.out.println("Someone entered the Main thread, trying to write");
//Get the data and remove the Data object from queue
ByteBuffer data = ByteBuffer.wrap(pendingData.data.toByteArray());
//Write to the data to the client(s)
int r = socketChannel.write(ByteBuffer.wrap(data.array()));
System.out.println("[WRITE] - Sending... Sent " + data.array().length + " bytes of data");
}
key.interestOps(SelectionKey.OP_READ);
}
ProtocolWorker.java:queueData():
Code:
public void queueData(Main server, SelectionKey key, byte[] data, int count) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
DataEvent socketData = server.dataMap.get(socketChannel);
//Synchornized with read and write, only one may use socketData at the same time
synchronized(socketData){
System.out.println("Someone entered the ProtocolWorker thread");
socketData.key = key;
socketData.server = server;
socketData.readSize += count;
if(socketData.tmp.size() >= HEADER_SIZE && socketData.packetTotalSize == 0){
socketData.setHeader(socketData.tmp.toByteArray());
socketData.tmp.write(data);
}else if(data.length >= HEADER_SIZE && socketData.packetTotalSize == 0){
socketData.setHeader(data);
socketData.tmp.write(data);
if(socketData.tmp.size() == socketData.packetTotalSize){
//Redefine socketData.data, remove old data
socketData.data = new ByteArrayOutputStream(0);
//Copy content of tmp to data
socketData.data.write(socketData.tmp.toByteArray());
//Write the data to the client.
server.write(socketData.key);
//Reset the data, tmp, packetTotalSize, packetSize, readSize and headerData since they're not important anymore and must be undefined.
socketData.resetHeader();
socketData.data = new ByteArrayOutputStream(1);
socketData.tmp = new ByteArrayOutputStream(1);
socketData.packetTotalSize = 0;
socketData.packetSize = 0;
socketData.readSize = 0;
}
}else if(socketData.readSize < socketData.packetTotalSize){
socketData.tmp.write(data);
}else if(socketData.readSize == socketData.packetTotalSize){
socketData.tmp.write(data);
//Redefine socketData.data, remove old data
socketData.data = new ByteArrayOutputStream(0);
//Copy content of tmp to data
socketData.data.write(socketData.tmp.toByteArray());
//Write the data to the client.
server.write(socketData.key);
//Reset the data, tmp, packetTotalSize, packetSize, readSize and headerData since they're not important anymore and must be undefined.
socketData.resetHeader();
socketData.data = new ByteArrayOutputStream(1);
socketData.tmp = new ByteArrayOutputStream(1);
socketData.packetTotalSize = 0;
socketData.packetSize = 0;
socketData.readSize = 0;
}else if(socketData.readSize > socketData.packetTotalSize){
//Read data into tmp
socketData.tmp.write(data);
//Write the command(including header) to socketData.data
socketData.data.write(socketData.tmp.toByteArray(), HEADER_SIZE, socketData.packetSize);
//Define newTmp to hold the rest of the data received
byte[] newTmp = new byte[count-socketData.packetTotalSize];
//Copy socketData.tmp to newTmp WITHOUT the header(!!)
System.arraycopy(socketData.tmp.toByteArray(), socketData.packetTotalSize, newTmp, 0, count-socketData.packetTotalSize);
//Redefine socketData.tmp
socketData.tmp = new ByteArrayOutputStream(0);
//Write newTmp to the redefined socketData.tmp
socketData.tmp.write(newTmp);
//Reset the packetTotalSize, packetSize and readSize since they're not important anymore and must be undefined.
socketData.packetTotalSize = 0;
socketData.packetSize = 0;
socketData.readSize = 0;
}
socketData.notify();
}
}
DataEvent.java:
Code:
package server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
public class DataEvent {
public Main server;
public SelectionKey key;
private final int HEADER_SIZE = 4;
//The size(as an int) of the packet, 4 first bytes
public byte[] headerData = new byte[4];
//The size of the data received by read()
public int dataSize = 0;
//The size of the packet, defined by first four bytes in data
public int packetSize = 0;
//The size of the packet including header
public int packetTotalSize = 0;
//The size of what's read of the total packet size
public int readSize = 0;
public ByteArrayOutputStream data = new ByteArrayOutputStream(0);
public ByteArrayOutputStream tmp = new ByteArrayOutputStream(0);
//public byte[] data = new byte[0];
//public byte[] tmp = new byte[0];
public boolean newCommand = true;
public DataEvent(byte[] data) throws IOException{
this.data.write(data);
for(int i=0; i < HEADER_SIZE; i++){
this.headerData[i] = -1;
}
}
public DataEvent(){
for(int i=0; i < HEADER_SIZE; i++){
this.headerData[i] = -1;
}
}
public DataEvent(Main server, SelectionKey key){
this.server = server;
this.key = key;
for(int i=0; i < HEADER_SIZE; i++){
this.headerData[i] = -1;
}
}
public void setHeader(byte[] data){
boolean allSet = true;
int y = 0;
for(int i=0; i < HEADER_SIZE; i++){
if(this.headerData[i] == -1){
this.headerData[i] = data[y];
y++;
}
}
for(int i=0; i < HEADER_SIZE; i++){
if(this.headerData[i] == -1){
allSet = false;
}
}
if(allSet == true){
this.packetSize = bytesToInt();
this.packetTotalSize = this.packetSize + HEADER_SIZE;
}
}
private int bytesToInt(){
ByteBuffer bb = ByteBuffer.wrap(headerData);
int result = bb.getInt();
return result;
}
public void resetHeader() {
for(int i=0; i < HEADER_SIZE; i++){
headerData[i] = -1;
}
}
}
Ok, so whenever i send multiple packets from the client(two "commands" sent as one tcp stream) after pausing at a breakpoint and then continuing the value of socketData.packetTotalSize in ProtocolWorker:queueData() has changed for an example and socketData.tmp has also changed from eg. [A,B,C] to [A,B,C,A,B,C]
Am i not synchronizing the socketData correctly? What could be wrong?
Posting Permissions
- You may not post new threads
- You may not post replies
- You may not post attachments
- You may not edit your posts
-
Forum Rules
|
Click Here to Expand Forum to Full Width
|