Peer 2 Peer networking and web service
1- Requirements
Using java language through eclipse.
Working in Windows.
Be able to extent the program to work as a web service.
Be able to extent the program to work in Raspberry Pi.
2- The program
Is Peer-2-Peer decentralised distributed system nodes.
Each node has both status (Client and Server).
Each node has bi-directional connection with its neighbours. For example let say we have 4 nodes {A,B,C,D} the connection should be fixable, be able to change the connection between node, like following
So the connection can take any from above figure and more.
Each node can represent independent java class, has the same code but different in function.
Each node assigned to specific ID and Attributers.
Each node has a full knowledge about all nodes in distributed system and their function even if there’s no direct connection between them.
3- Node
As we mentioned its independent class in java
The good points that all nodes has the same code but different in ID, Attributes and Node function for example the following figure shows
The connection is based on socket (send and receive) with other nodes.
Each node has a direct connection to its own database.
The function is based on database to do the appropriate task.
4- Processing
Each node has able to send the request query to any other nodes.
If there’s no direct connection between the sender and receiver, the data will pass through the intermediate nodes before arrive to the intended node (receiver). For example in Figure 1 –a- if the source node is A and the intended node is D then the data will pass both B and C before arriving to D.
The objective of using Attributes is to apply Attribute based access control policy in this distributed system. Please look this web site it will be helpful.
Be able to use intermediate nodes to do partial processing before arriving to the intended node.
According to source node request will wait a respond or just save the result in intended node.
5- Example
Let say we have the following network
The task is Node A has its own database, use to save the salary table for the employees. The salary table include personal information for the employees and these information will be used in other nodes to calculate the salary.
Node A
Node A send request query to do a process on its data table. Like
Node A will determine the processing path according to what function required. ( all path will be saved, Node A will select the shortest path by using Depth First Search)
Push the processing nodes ID in a stack.
Send the DB table to next node.
If the next node is matching the given attributes then give a permission to this node to access to the data otherwise, use this node like a bridge to access to the next node.
The scenario here, the data will be flooded into system nodes, check each node if it matching what the attribute then give access permission otherwise move to next node (p2p decentralised concept).
Now data is going out from Node A to B.
Node B ( matched the attribute and it’s the top of stack PathStack)
Its function is to make a query to its own database using the id’s of the table records (Name, Address,…) to retrieve the price of a working hour. After that calculate the Salary by= No.Hours x Price.Each H. after that save the Salary result in table for each record individually. In this case (Bank AccoundNo. & Bank SourceCo.) columns will be Inaccessible by Node B ( Using Attribute Based Access control) As shown in next figure.
After that the table will leave node B with salary values to next node.
In this case will be Node D or E. Both nodes in this case are used as a bridge not processing node because they don’t match the ID and the attribute.
Node F
Is the final node in processing path ( processing path not mean all network path so may E or D or B or any node in some cases will be the final node in the processing path) see next section. Node F will be given a persmison to access to the table and do its task. The task her is just user Bank AccountNo and Bank SourceCo to do money transaction and add “Approved” in Money transection column.
After all the above processing the final table will return back to node A as a response to its request. And will take the following final result to save it in Node A databse.
And all information in the table are accessible and clear to node A
When NodeF send the final result to nodeA the data will be inaccessible to all nodes between F and A and use them as Bridge.
Processing paths and network routes
Processing path is the path that represents the node that will do the task ( processing node ) not (bridge nodes) for example:
Route path is the path that holds all the node in the network and what at the node required to conntect between two system nodes
For example
In out previous example, the path/ route between
AF = { A,B,D,F}, {A,B,E,F},{A,B,D,E,F},{A,B,E,D,F},{A,C,E,F},{A,C,E,D,F} ….
However, the processing path is just {A,B,F}
Using DFS to determine the shortest path
In our example we need from intermediate nodes to do partial processing on data so the DFS will applied as follows :
The processing requires two nodes to accomplish the task, node B,F so we calculate the path between AB, path BF according to what the intermediate nodes and what we need from them.
Solution
NodeBehaviour.java
package hw.com;
// interface represents the behaviour of a node.
public interface NodeBehaviour {
Data handle(Data data);
}
AccessEvaluator.java
package hw.com;
import java.util.List;
public class AccessEvaluator {
/*
* Check if the attributes are allowed to change in the data.
*/
public boolean hasPermission(Data data, final List<String> attributes) {
for (String attribute : data.getAttributes()) {
if (attributes.contains(attribute))
return true;
}
return false;
}
}
Data.java
package hw.com;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public class Data implements Serializable {
/*
* The attributes that represent the roles that can modify this data.
*/
private List<String> attributes;
/*
* The database entries.
*/
private Map<String, List<String>> fields;
public Data(List<String> attributes, Map<String, List<String>> fields) {
this.attributes = attributes;
this.fields = fields;
}
public List<String> getAttributes() {
return attributes;
}
public void setAttributes(List<String> attributes) {
this.attributes = attributes;
}
public Map<String, List<String>> getFields() {
return fields;
}
public void setFields(Map<String, List<String>> fields) {
this.fields = fields;
}
}
DataSerializer.java
package hw.com;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class DataSerializer {
/*
* serialize the data object to be transferd in the network
*/
public String serialize(Data data) {
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream so = new ObjectOutputStream(bo);
so.writeObject(data);
so.flush();
return bo.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/*
* deserialize the data received in the network
*/
public Data deserialize(String serialized) {
try {
byte b[] = serialized.getBytes();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
ObjectInputStream si = new ObjectInputStream(bi);
return (Data) si.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
DatabaseConnection.java
package hw.com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class DatabaseConnection {
private static final String DATABASE_NAME = “hw”;
private static DatabaseConnection instance;
private Connection connection;
private DatabaseConnection() {
try {
Class.forName(“com.mysql.jdbc.Driver”);
connection = DriverManager.getConnection(“jdbc:mysql://localhost/” + DATABASE_NAME, “root”, “”);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/*
* creates a new sql statement to execute sql queries.
*/
public Statement createStatement() throws SQLException {
return connection.createStatement();
}
/*
* retrieves a singleton.
*/
public synchronized static DatabaseConnection getInstance() {
if (instance == null) {
instance = new DatabaseConnection();
}
return instance;
}
}
SalaryBehaviour.java
package hw.com;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
public class SalaryBehaviour implements NodeBehaviour {
public static final int PRICE_PER_HOUR = 80;
// update the employee’s salary .
public Data handle(Data data) {
try {
List<String> nos = data.getFields().get(“NO”);
List<String> workingHours = data.getFields().get(“No. working Hours”);
for (int i = 0; i < nos.size(); i++) {
String no = nos.get(i);
String workingHour = workingHours.get(i);
Statement updateStatement = DatabaseConnection.getInstance().createStatement();
updateStatement.execute(“UPDATE Employee SET (Salary) VALUES (” + calculateSalary(workingHour) + “) WHERE NO = ” + no);
}
} catch (SQLException e) {
e.printStackTrace();
}
return data;
}
private String calculateSalary(String workingHour) {
return Integer.toString(Integer.parseInt(workingHour) * PRICE_PER_HOUR);
}
}
NodeConnection.java
package hw.com;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
public class NodeConnection {
private Node node;
private String ipAddress;
private DataOutputStream output;
private DataInputStream input;
private boolean running = false;
private Socket outSocket;
private Socket inSocket;
private Thread inputThread;
public NodeConnection(String ipAddress, Node node) {
this.ipAddress = ipAddress;
this.node = node;
}
public void connect() throws IOException {
outSocket = node.createSocket(ipAddress);
output = new DataOutputStream(outSocket.getOutputStream());
send(node.getId());
}
public Node asNode() {
return node;
}
public void send(String data) throws IOException {
output.writeUTF(data);
output.flush();
}
public void setInput(Socket inSocket, DataInputStream input) {
this.inSocket = inSocket;
this.input = input;
}
public void listenForInputs() {
running = true;
System.out.println(“Hello”);
inputThread = new Thread(new Runnable() {
public void run() {
while (running) {
try {
String data = input.readUTF();
System.out.println(data);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
inSocket.close();
outSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
inputThread.start();
}
public void close() throws InterruptedException {
running = false;
//inputThread.join();
}
}
Application.java
package hw.com;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Application {
public static final String HOST = “localhost”;
public static void main(String[] args) throws Exception {
System.out.println(“———— Start ————“);
Node a = new Node();
Node b = new Node();
Node c = new Node();
Node d = new Node();
Node e = new Node();
Node f = new Node();
Map<String, List<Node>> allNodes = createNodes(HOST, a, b, c, d, e, f);
NodeBehaviour nodeBehaviour = new NodeBehaviour() {
public Data handle(Data data) {
return data;
}
};
a.create(9000, allNodes, createNodes(HOST, b, c), new DatabaseFetchBehaviour());
b.create(9001, allNodes, createNodes(HOST, e, d), new SalaryBehaviour());
c.create(9002, allNodes, createNodes(HOST, a, e), nodeBehaviour);
d.create(9003, allNodes, createNodes(HOST, b, e, f), nodeBehaviour);
e.create(9004, allNodes, createNodes(HOST, b, c, f), nodeBehaviour);
f.create(9005, allNodes, createNodes(HOST, d, e), nodeBehaviour);
a.connectToNeighbors();
b.connectToNeighbors();
c.connectToNeighbors();
d.connectToNeighbors();
e.connectToNeighbors();
f.connectToNeighbors();
a.send(“ss”, new Data(Arrays.asList(“One”, “Two”), new HashMap<String, List<String>>() {{
put(“field”, Arrays.asList(“value”));
}}));
a.close();
b.close();
c.close();
d.close();
e.close();
f.close();
System.out.println(“————- End ————-“);
}
private static void createNodes(Map<String, List<Node>> nodesMap, String host, Node… nodes) {
List<Node> nodesList = new ArrayList();
for (Node node : nodes) {
nodesList.add(node);
}
nodesMap.put(host, nodesList);
}
private static Map<String, List<Node>> createNodes(String host, Node… nodes) {
Map<String, List<Node>> nodesMap = new HashMap();
createNodes(nodesMap, host, nodes);
return nodesMap;
}
}
DatabaseFetchBehaviour.java
package hw.com;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DatabaseFetchBehaviour implements NodeBehaviour {
/*
* creates a new data to be transfered with the database fields load in.
*/
public Data handle(Data data) {
Map<String, List<String>> fields = new HashMap();
try {
Statement statement = DatabaseConnection.getInstance().createStatement();
statement.execute(“SELECT * FROM Employee”);
ResultSet resultSet = statement.getResultSet();
List<String> no = new ArrayList();
List<String> name = new ArrayList();
List<String> address = new ArrayList();
List<String> workingHours = new ArrayList();
List<String> salary = new ArrayList();
List<String> bankAccount = new ArrayList();
List<String> bankSource = new ArrayList();
List<String> moneyTransaction = new ArrayList();
while (resultSet.next()) {
no.add(resultSet.getString(“NO”));
name.add(resultSet.getString(“Name”));
address.add(resultSet.getString(“Address”));
workingHours.add(resultSet.getString(“No. working Hours”));
salary.add(resultSet.getString(“Salary”));
bankAccount.add(resultSet.getString(“Bank AccountNo”));
bankSource.add(resultSet.getString(“Bank SourceCo”));
moneyTransaction.add(resultSet.getString(“Money transaction”));
}
fields.put(“NO”, no);
fields.put(“Name”, name);
fields.put(“Address”, address);
fields.put(“No. working Hours”, workingHours);
fields.put(“Salary”, salary);
fields.put(“Bank AccountNo”, bankAccount);
fields.put(“Bank SourceCo”, bankSource);
fields.put(“Money transaction”, moneyTransaction);
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
return new Data(data.getAttributes(), fields);
}
}
Node.java
package hw.com;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import static java.util.Map.Entry;
import static java.util.UUID.randomUUID;
public class Node {
private String id = randomUUID().toString();
private int port;
private ServerSocket serverSocket;
private Node parent;
private List<NodeConnection> neighbors;
private List<NodeConnection> allNodes;
private List<String> attributes = Collections.emptyList();
private NodeBehaviour nodeBehaviour;
private DatabaseConnection databaseConnection;
private AccessEvaluator accessEvaluator = new AccessEvaluator();
/*
* initialize a node.
*/
public void create(int port, Map<String, List<Node>> allNodesByHost, Map<String, List<Node>> neighborsByHost, NodeBehaviour nodeBehaviour) throws IOException {
this.port = port;
this.nodeBehaviour = nodeBehaviour;
serverSocket = new ServerSocket(port);
// registe it’s peers.
new Thread(new Runnable() {
public void run() {
try {
Socket inSocket = serverSocket.accept();
DataInputStream input = new DataInputStream(inSocket.getInputStream());
String peerId = input.readUTF();
for (NodeConnection neighbor : neighbors) {
if (neighbor.asNode().id.equals(peerId)) {
neighbor.setInput(inSocket, input);
neighbor.listenForInputs();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
this.allNodes = new ArrayList();
this.neighbors = new ArrayList();
for (Entry<String, List<Node>> nodeList : allNodesByHost.entrySet()) {
for (Node node : nodeList.getValue()) {
allNodes.add(new NodeConnection(nodeList.getKey(), node));
}
}
for (Entry<String, List<Node>> nodeList : neighborsByHost.entrySet()) {
for (Node node : nodeList.getValue()) {
neighbors.add(new NodeConnection(nodeList.getKey(), node));
}
}
}
public void close() throws IOException, InterruptedException {
serverSocket.close();
for (NodeConnection node : neighbors) {
node.close();
}
}
// send data to a node with the unique id receiverNodeId.
public void send(String receiverNodeId, Data data) throws IOException {
// check if has permissions.
if (accessEvaluator.hasPermission(data, attributes)) {
// if has permissions then perform work on the data.
data = nodeBehaviour.handle(data);
}
// send data after serialization.
send(receiverNodeId, new DataSerializer().serialize(data));
}
private void send(String receiverNodeId, String data) throws IOException {
for (NodeConnection node : neighbors) {
if (receiverNodeId.equals(node.asNode().id)) {
sendTo(node, data);
return;
}
}
for (NodeConnection node : allNodes) {
if (receiverNodeId.equals(node.asNode().id)) {
Node nextHop = breadthFirstSearchForNextHop(node.asNode());
sendTo(nextHop, data);
return;
}
}
}
private void sendTo(Node node, String data) throws IOException {
for (NodeConnection neighbor : neighbors) {
if (neighbor.asNode() == node) {
neighbor.send(data);
break;
}
}
}
private void sendTo(NodeConnection node, String data) throws IOException {
node.send(data);
}
// create a connect to this node’s neighbors.
public void connectToNeighbors() throws IOException {
for (NodeConnection node : neighbors) {
node.connect();
}
}
public Socket createSocket(String peerIpAddress) throws IOException {
return new Socket(peerIpAddress, port);
}
// execute Breadth First Search to find the nearest neighbor to the destination.
private Node breadthFirstSearchForNextHop(Node goalNode) {
LinkedList<Node> closedList = new LinkedList();
LinkedList<Node> openList = new LinkedList();
openList.add(this);
parent = null;
while (!openList.isEmpty()) {
Node node = openList.removeFirst();
if (node == goalNode) {
LinkedList<Node> path = new LinkedList();
while (node.parent != null) {
path.addFirst(node);
node = node.parent;
}
return path.get(0);
} else {
closedList.add(node);
Iterator<NodeConnection> i = node.neighbors.iterator();
while (i.hasNext()) {
Node neighborNode = i.next().asNode();
if (!closedList.contains(neighborNode) && !openList.contains(neighborNode)) {
neighborNode.parent = node;
openList.add(neighborNode);
}
}
}
}
return null;
}
public String getId() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Node node = (Node) o;
return id != null ? id.equals(node.id) : node.id == null;
}
@Override
public int hashCode() {
return id != null ? id.hashCode() : 0;
}
}