001/*
002 * Copyright 2012-2014 Donghwan Kim
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.github.flowersinthesand.portal;
017
018import java.io.Serializable;
019import java.util.Collections;
020import java.util.LinkedHashMap;
021import java.util.Map;
022
023import io.github.flowersinthesand.wes.Action;
024import io.github.flowersinthesand.wes.Actions;
025import io.github.flowersinthesand.wes.ConcurrentActions;
026
027/**
028 * {@link Server} implementation for clustering.
029 * <p>
030 * This implementation follows the publish and subscribe model from Java Message
031 * Service (JMS) to support clustering. Here, the message represents invocation
032 * of socket action and is created when one of selector actions is called. The
033 * publisher should publish the message passed from
034 * {@link ClusteredServer#publishAction(Action)} to all nodes in cluster and the
035 * subscriber should propagate a message sent from one of node in cluster to
036 * {@link ClusteredServer#messageAction()}.
037 * <p>
038 * An invocation of the following socket finder actions will propagate to all
039 * the server in cluster:
040 * <ul>
041 * <li>{@link ClusteredServer#all(Action)}</li>
042 * <li>{@link ClusteredServer#byId(Action)}</li>
043 * <li>{@link ClusteredServer#byTag(Action)}</li>
044 * </ul>
045 * That means {@code server.all(action)} executes a given action with not only
046 * all the sockets in this server but also all the sockets in all the other
047 * servers in the cluster.
048 * <p>
049 * Accordingly, most of Message Oriented Middlware requires message to be
050 * serialized and you may have to have pass {@link Action} implementing
051 * {@link Serializable} on method call. See the provided link, serialization of
052 * inner classes including local and anonymous classes, is discouraged and
053 * doesn't work in some cases. Therefore, always use {@link Sentence} instead of action
054 * if possible.
055 * 
056 * @author Donghwan Kim
057 * @see Sentence
058 * @see <a
059 *      href="http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serial-arch.html#4539">Note
060 *      of the Serializable Interface</a>
061 */
062public class ClusteredServer extends DefaultServer {
063
064    private Actions<Map<String, Object>> publishActions = new ConcurrentActions<>();
065    private Action<Map<String, Object>> messageAction = new Action<Map<String, Object>>() {
066        @SuppressWarnings("unchecked")
067        @Override
068        public void on(Map<String, Object> map) {
069            String methodName = (String) map.get("method");
070            Object[] args = (Object[]) map.get("args");
071            switch (methodName) {
072            case "all":
073                ClusteredServer.super.all((Action<Socket>) args[0]);
074                break;
075            case "byId":
076                ClusteredServer.super.byId((String) args[0], (Action<Socket>) args[1]);
077                break;
078            case "byTag":
079                ClusteredServer.super.byTag((String[]) args[0], (Action<Socket>) args[1]);
080                break;
081            default:
082                throw new IllegalArgumentException("Illegal method name in processing message: " + methodName);
083            }
084        }
085    };
086
087    @Override
088    public Server all(Action<Socket> action) {
089        publishMessage("all", action);
090        return this;
091    }
092
093    @Override
094    public Server byId(String id, Action<Socket> action) {
095        publishMessage("byId", id, action);
096        return this;
097    }
098
099    @Override
100    public Server byTag(String[] names, Action<Socket> action) {
101        publishMessage("byTag", names, action);
102        return this;
103    }
104
105    private void publishMessage(String method, Object... args) {
106        Map<String, Object> map = new LinkedHashMap<>();
107        map.put("method", method);
108        map.put("args", args);
109        publishActions.fire(Collections.unmodifiableMap(map));
110    }
111
112    /**
113     * Attaches an action to be called with a map containing method name and
114     * arguments of socket action when it's called.
115     */
116    public Server publishAction(Action<Map<String, Object>> action) {
117        publishActions.add(action);
118        return this;
119    }
120
121    /**
122     * This action receives a map fired from one of node in cluster and invokes
123     * socket action in this server.
124     */
125    public Action<Map<String, Object>> messageAction() {
126        return messageAction;
127    }
128
129}