001/* 002 * Copyright 2012-2014 Donghwan Kim 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.github.flowersinthesand.portal; 017 018import java.io.Serializable; 019import java.util.Collections; 020import java.util.LinkedHashMap; 021import java.util.Map; 022 023import io.github.flowersinthesand.wes.Action; 024import io.github.flowersinthesand.wes.Actions; 025import io.github.flowersinthesand.wes.ConcurrentActions; 026 027/** 028 * {@link Server} implementation for clustering. 029 * <p> 030 * This implementation follows the publish and subscribe model from Java Message 031 * Service (JMS) to support clustering. Here, the message represents invocation 032 * of socket action and is created when one of selector actions is called. The 033 * publisher should publish the message passed from 034 * {@link ClusteredServer#publishAction(Action)} to all nodes in cluster and the 035 * subscriber should propagate a message sent from one of node in cluster to 036 * {@link ClusteredServer#messageAction()}. 037 * <p> 038 * An invocation of the following socket finder actions will propagate to all 039 * the server in cluster: 040 * <ul> 041 * <li>{@link ClusteredServer#all(Action)}</li> 042 * <li>{@link ClusteredServer#byId(Action)}</li> 043 * <li>{@link ClusteredServer#byTag(Action)}</li> 044 * </ul> 045 * That means {@code server.all(action)} executes a given action with not only 046 * all the sockets in this server but also all the sockets in all the other 047 * servers in the cluster. 048 * <p> 049 * Accordingly, most of Message Oriented Middlware requires message to be 050 * serialized and you may have to have pass {@link Action} implementing 051 * {@link Serializable} on method call. See the provided link, serialization of 052 * inner classes including local and anonymous classes, is discouraged and 053 * doesn't work in some cases. Therefore, always use {@link Sentence} instead of action 054 * if possible. 055 * 056 * @author Donghwan Kim 057 * @see Sentence 058 * @see <a 059 * href="http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serial-arch.html#4539">Note 060 * of the Serializable Interface</a> 061 */ 062public class ClusteredServer extends DefaultServer { 063 064 private Actions<Map<String, Object>> publishActions = new ConcurrentActions<>(); 065 private Action<Map<String, Object>> messageAction = new Action<Map<String, Object>>() { 066 @SuppressWarnings("unchecked") 067 @Override 068 public void on(Map<String, Object> map) { 069 String methodName = (String) map.get("method"); 070 Object[] args = (Object[]) map.get("args"); 071 switch (methodName) { 072 case "all": 073 ClusteredServer.super.all((Action<Socket>) args[0]); 074 break; 075 case "byId": 076 ClusteredServer.super.byId((String) args[0], (Action<Socket>) args[1]); 077 break; 078 case "byTag": 079 ClusteredServer.super.byTag((String[]) args[0], (Action<Socket>) args[1]); 080 break; 081 default: 082 throw new IllegalArgumentException("Illegal method name in processing message: " + methodName); 083 } 084 } 085 }; 086 087 @Override 088 public Server all(Action<Socket> action) { 089 publishMessage("all", action); 090 return this; 091 } 092 093 @Override 094 public Server byId(String id, Action<Socket> action) { 095 publishMessage("byId", id, action); 096 return this; 097 } 098 099 @Override 100 public Server byTag(String[] names, Action<Socket> action) { 101 publishMessage("byTag", names, action); 102 return this; 103 } 104 105 private void publishMessage(String method, Object... args) { 106 Map<String, Object> map = new LinkedHashMap<>(); 107 map.put("method", method); 108 map.put("args", args); 109 publishActions.fire(Collections.unmodifiableMap(map)); 110 } 111 112 /** 113 * Attaches an action to be called with a map containing method name and 114 * arguments of socket action when it's called. 115 */ 116 public Server publishAction(Action<Map<String, Object>> action) { 117 publishActions.add(action); 118 return this; 119 } 120 121 /** 122 * This action receives a map fired from one of node in cluster and invokes 123 * socket action in this server. 124 */ 125 public Action<Map<String, Object>> messageAction() { 126 return messageAction; 127 } 128 129}