001/*
002 * Copyright 2012-2014 Donghwan Kim
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.github.flowersinthesand.portal;
017
018import io.github.flowersinthesand.wes.Action;
019import io.github.flowersinthesand.wes.Actions;
020import io.github.flowersinthesand.wes.ConcurrentActions;
021import io.github.flowersinthesand.wes.Data;
022import io.github.flowersinthesand.wes.HttpStatus;
023import io.github.flowersinthesand.wes.ServerHttpExchange;
024import io.github.flowersinthesand.wes.ServerWebSocket;
025import io.github.flowersinthesand.wes.VoidAction;
026
027import java.io.IOException;
028import java.io.UnsupportedEncodingException;
029import java.net.URI;
030import java.net.URLDecoder;
031import java.nio.CharBuffer;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.LinkedHashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.Timer;
040import java.util.TimerTask;
041import java.util.UUID;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentMap;
044import java.util.concurrent.CopyOnWriteArraySet;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.regex.Matcher;
048import java.util.regex.Pattern;
049
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import com.fasterxml.jackson.core.JsonProcessingException;
054import com.fasterxml.jackson.core.type.TypeReference;
055import com.fasterxml.jackson.databind.ObjectMapper;
056
057/**
058 * Default implementation of {@link Server}.
059 * <p>
060 * This implementation provides and manages {@link Socket} processing HTTP
061 * request and WebSocket following the portal protocol.
062 * <p>
063 * As options, the following methods can be overridden.
064 * <ul>
065 * <li>{@link DefaultServer#parseURI(String)}
066 * <li>{@link DefaultServer#parseEvent(String)}
067 * <li>{@link DefaultServer#stringifyEvent(Map)}
068 * </ul> 
069 * 
070 * @author Donghwan Kim
071 */
072public class DefaultServer implements Server {
073
074    private final Logger log = LoggerFactory.getLogger(DefaultServer.class);
075    private ConcurrentMap<String, DefaultSocket> sockets = new ConcurrentHashMap<>();
076    private Actions<Socket> socketActions = new ConcurrentActions<>();
077
078    private Action<ServerHttpExchange> httpAction = new Action<ServerHttpExchange>() {
079        @Override
080        public void on(final ServerHttpExchange http) {
081            switch (http.method()) {
082            case "GET":
083                Map<String, String> params = parseURI(http.uri());
084                setNocache(http);
085                setCors(http);
086                switch (params.get("when")) {
087                case "open":
088                    switch (params.get("transport")) {
089                    case "sse":
090                    case "streamxhr":
091                    case "streamxdr":
092                    case "streamiframe":
093                        socketActions.fire(new DefaultSocket(new StreamTransport(params, http)));
094                        break;
095                    case "longpollajax":
096                    case "longpollxdr":
097                    case "longpolljsonp":
098                        socketActions.fire(new DefaultSocket(new LongpollTransport(params, http)));
099                        break;
100                    default:
101                        log.error("Transport, {}, is not supported", params.get("transport"));
102                        http.setStatus(HttpStatus.NOT_IMPLEMENTED).close();
103                    }
104                    break;
105                case "poll": {
106                    String id = params.get("id");
107                    DefaultSocket socket = sockets.get(id);
108                    if (socket != null) {
109                        Transport transport = socket.transport;
110                        if (transport instanceof LongpollTransport) {
111                            ((LongpollTransport) transport).refresh(http);
112                        } else {
113                            log.error("Non-long polling transport#{} sent poll request", id);
114                            http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close();
115                        }
116                    } else {
117                        log.error("Long polling transport#{} is not found in poll request", id);
118                        http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close();
119                    }
120                    break; }
121                case "abort": {
122                    String id = params.get("id");
123                    Socket socket = sockets.get(id);
124                    if (socket != null) {
125                        socket.close();
126                    }
127                    http.setResponseHeader("content-type", "text/javascript; charset=utf-8").close();
128                    break; }
129                default:
130                    log.error("when, {}, is not supported", params.get("when"));
131                    http.setStatus(HttpStatus.NOT_IMPLEMENTED).close();
132                    break;
133                }
134                break;
135            case "POST":
136                setNocache(http);
137                setCors(http);
138                http.bodyAction(new Action<Data>() {
139                    @Override
140                    public void on(Data body) {
141                        String data = body.as(String.class).substring("data=".length());
142                        String id = findSocketId(data);
143                        
144                        DefaultSocket socket = sockets.get(id);
145                        if (socket != null) {
146                            Transport transport = socket.transport;
147                            if (transport instanceof HttpTransport) {
148                                transport.messageActions.fire(data);
149                            } else {
150                                log.error("Non-HTTP socket#{} receives a POST message", id);
151                                http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
152                            }
153                        } else {
154                            log.error("A POST message arrived but no socket#{} is found", id);
155                            http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
156                        }
157                        http.close();
158                    };
159                    
160                    private String findSocketId(String text) {
161                        Matcher matcher = Pattern.compile("\"socket\":\"([^\"]+)\"").matcher(text);
162                        matcher.find();
163                        return matcher.group(1);
164                    }
165                });
166                break;
167            default:
168                log.error("HTTP method, {}, is not supported", http.method());
169                http.setStatus(HttpStatus.METHOD_NOT_ALLOWED).close();
170                break;
171            }
172        }
173        
174        private void setNocache(ServerHttpExchange http) {
175            http
176            .setResponseHeader("cache-control", "no-cache, no-store, must-revalidate")
177            .setResponseHeader("pragma", "no-cache")
178            .setResponseHeader("expires", "0");
179        }
180        
181        private void setCors(ServerHttpExchange http) {
182            String origin = http.requestHeader("origin");
183            String acrh = http.requestHeader("access-control-request-headers");
184            http
185            .setResponseHeader("access-control-allow-origin", origin != null ? origin : "*")
186            .setResponseHeader("access-control-allow-credentials", "true");
187            if (acrh != null) {
188                http.setResponseHeader("access-control-allow-headers", acrh);
189            }
190        }
191    };
192
193    private Action<ServerWebSocket> websocketAction = new Action<ServerWebSocket>() {
194        @Override
195        public void on(ServerWebSocket ws) {
196            Map<String, String> params = parseURI(ws.uri());
197            socketActions.fire(new DefaultSocket(new WebSocketTransport(params, ws)));
198        }
199    };
200
201    /**
202     * Takes a portal URI and returns a map of parameters.
203     * <p>
204     * This is a counterpart of {@code urlBuilder} of client option.
205     */
206    protected Map<String, String> parseURI(String uri) {
207        Map<String, String> map = new LinkedHashMap<>();
208        String query = URI.create(uri).getQuery();
209        if ((query == null) || (query.equals(""))) {
210            return map;
211        }
212
213        String[] params = query.split("&");
214        for (String param : params) {
215            try {
216                String[] pair = param.split("=", 2);
217                String name = URLDecoder.decode(pair[0], "UTF-8");
218                if (name == "") {
219                    continue;
220                }
221
222                map.put(name, pair.length > 1 ? URLDecoder.decode(pair[1], "UTF-8") : "");
223            } catch (UnsupportedEncodingException e) {}
224        }
225
226        return Collections.unmodifiableMap(map);
227    }
228
229    /**
230     * Takes a stringified event and returns an event object.
231     * <p>
232     * A text in argument is generated by {@code outbound} of client option and
233     * this is akin to {@code inbound} of client option.
234     */
235    protected Map<String, Object> parseEvent(String text) {
236        try {
237            return new ObjectMapper().readValue(text, new TypeReference<Map<String, Object>>() {});
238        } catch (IOException e) {
239            throw new RuntimeException(e);
240        }
241    }
242
243    /**
244     * Takes an event object and returns a stringified event.
245     * <p>
246     * This is akin to {@code outbound} of client option and a returned value will
247     * be handled by {@code inbound} of client option.
248     */
249    protected String stringifyEvent(Map<String, Object> event) {
250        try {
251            return new ObjectMapper().writeValueAsString(event);
252        } catch (JsonProcessingException e) {
253            throw new RuntimeException(e);
254        }
255    }
256    
257    @Override
258    public Server all(Action<Socket> action) {
259        for (Socket socket : sockets.values()) {
260            action.on(socket);
261        }
262        return this;
263    }
264
265    @Override
266    public Server byId(String id, Action<Socket> action) {
267        Socket socket = sockets.get(id);
268        if (socket != null) {
269            action.on(socket);
270        }
271        return this;
272    }
273
274    @Override
275    public Server byTag(String name, Action<Socket> action) {
276        return byTag(new String[] { name }, action);
277    }
278
279    @Override
280    public Server byTag(String[] names, Action<Socket> action) {
281        List<String> nameList = Arrays.asList(names);
282        for (Socket socket : sockets.values()) {
283            if (socket.tags().containsAll(nameList)) {
284                action.on(socket);
285            }
286        }
287        return this;
288    }
289
290    @Override
291    public Server socketAction(Action<Socket> action) {
292        socketActions.add(action);
293        return this;
294    }
295
296    @Override
297    public Action<ServerHttpExchange> httpAction() {
298        return httpAction;
299    }
300
301    @Override
302    public Action<ServerWebSocket> websocketAction() {
303        return websocketAction;
304    }
305
306    private abstract class Transport {
307        final Map<String, String> params;
308        Actions<String> messageActions = new ConcurrentActions<>();
309        Actions<Void> closeActions = new ConcurrentActions<>();
310
311        Transport(Map<String, String> params) {
312            this.params = params;
313        }
314        
315        abstract String uri();
316        abstract void send(String data);
317        abstract void close();
318    }
319
320    private class WebSocketTransport extends Transport {
321        final ServerWebSocket ws;
322        
323        WebSocketTransport(Map<String, String> params, ServerWebSocket ws) {
324            super(params);
325            this.ws = ws;
326            ws.closeAction(new VoidAction() {
327                @Override
328                public void on() {
329                    closeActions.fire();
330                }
331            })
332            .messageAction(new Action<Data>() {
333                @Override
334                public void on(Data data) {
335                    messageActions.fire(data.as(String.class));
336                }
337            });
338        }
339        
340        @Override
341        String uri() {
342            return ws.uri();
343        }
344
345        @Override
346        synchronized void send(String data) {
347            ws.send(data);
348        }
349
350        @Override
351        synchronized void close() {
352            ws.close();
353        }
354    }
355    
356    private abstract class HttpTransport extends Transport {
357        final ServerHttpExchange http;
358        
359        HttpTransport(Map<String, String> params, ServerHttpExchange http) {
360            super(params);
361            this.http = http;
362        }
363        
364        @Override
365        String uri() {
366            return http.uri();
367        }
368    }
369
370    final static String text2KB = CharBuffer.allocate(2048).toString().replace('\0', ' ');
371    
372    private class StreamTransport extends HttpTransport {
373        final boolean isAndroidLowerThan3;
374        
375        StreamTransport(Map<String, String> params, ServerHttpExchange http) {
376            super(params, http);
377            String ua = http.requestHeader("user-agent");
378            this.isAndroidLowerThan3 = ua == null ? false : ua.matches(".*Android\\s[23]\\..*");
379            http.closeAction(new VoidAction() {
380                @Override
381                public void on() {
382                    closeActions.fire();
383                }
384            })
385            .setResponseHeader("content-type",
386                "text/" + (params.get("transport").equals("sse") ? "event-stream" : "plain") + "; charset=utf-8")
387            .write((isAndroidLowerThan3 ? text2KB : "") + text2KB + "\n");
388        }
389        
390        @Override
391        synchronized void send(String data) {
392            String payload = (isAndroidLowerThan3 ? text2KB + text2KB : "") + "";
393            for (String datum : data.split("\r\n|\r|\n")) {
394                payload += "data: " + datum + "\n";
395            }
396            payload += "\n";
397            http.write(payload);
398        }
399        
400        @Override
401        synchronized void close() {
402            http.close();
403        }
404    }
405    
406    private class LongpollTransport extends HttpTransport {
407        AtomicReference<ServerHttpExchange> httpRef = new AtomicReference<>();
408        AtomicBoolean closed = new AtomicBoolean();
409        AtomicBoolean written = new AtomicBoolean();
410        Set<String> buffer = new CopyOnWriteArraySet<>(); 
411        AtomicReference<Timer> closeTimer = new AtomicReference<>();
412        
413        LongpollTransport(Map<String, String> params, ServerHttpExchange http) {
414            super(params, http);
415            refresh(http);
416        }
417
418        void refresh(ServerHttpExchange http) {
419            final Map<String, String> parameters = parseURI(http.uri());
420            http.closeAction(new VoidAction() {
421                @Override
422                public void on() {
423                    closed.set(true);
424                    if (parameters.get("when").equals("poll") && !written.get()) {
425                        closeActions.fire();
426                    }
427                    Timer timer = new Timer(true);
428                    timer.schedule(new TimerTask() {
429                        @Override
430                        public void run() {
431                            closeActions.fire();
432                        }
433                    }, 500);
434                    closeTimer.set(timer);
435                }
436            })
437            .setResponseHeader("content-type", 
438                "text/" + (params.get("transport").equals("longpolljsonp") ? "javascript" : "plain") + "; charset=utf-8");
439            
440            if (parameters.get("when").equals("open")) {
441                http.close();
442            } else {
443                httpRef.set(http);
444                closed.set(false);
445                written.set(false);
446                Timer timer = closeTimer.getAndSet(null);
447                if (timer != null) {
448                    timer.cancel();
449                }
450                if (parameters.containsKey("lastEventIds")) {
451                    String[] lastEventIds = parameters.get("lastEventIds").split(",");
452                    for (String eventId : lastEventIds) {
453                        for (String message : buffer) {
454                            if (eventId.equals(findEventId(message))) {
455                                buffer.remove(message);
456                            }
457                        }
458                    }
459                    if (!buffer.isEmpty()) {
460                        Iterator<String> iterator = buffer.iterator();
461                        String string = iterator.next();
462                        while (iterator.hasNext()) {
463                            string += "," + iterator.next();
464                        }
465                        send("[" + string + "]");
466                    }
467                }
468            }
469        }
470
471        private String findEventId(String text) {
472            Matcher matcher = Pattern.compile("\"id\":\"([^\"]+)\"").matcher(text);
473            matcher.find();
474            return matcher.group(1);
475        }
476
477        @Override
478        synchronized void send(String data) {
479            if (!data.startsWith("[")) {
480                buffer.add(data);
481            }
482            ServerHttpExchange http = httpRef.getAndSet(null);
483            if (http != null && !closed.get()) {
484                written.set(true);
485                String payload;
486                try {
487                    payload = params.get("transport").equals("longpolljsonp") ? 
488                        params.get("callback") + "(" + new ObjectMapper().writeValueAsString(data) + ");" : 
489                        data; 
490                } catch (JsonProcessingException e) {
491                    throw new RuntimeException(e);
492                }
493                http.close(payload);
494            }
495        }
496        
497        @Override
498        synchronized void close() {
499            ServerHttpExchange http = httpRef.getAndSet(null);
500            if (http != null && !closed.get()) {
501                http.close();
502            }
503        }
504    }
505    
506    private class DefaultSocket implements Socket {
507        final Transport transport;
508        Set<String> tags = new CopyOnWriteArraySet<>();
509        ConcurrentMap<String, Actions<Object>> actionsMap = new ConcurrentHashMap<>();
510        ConcurrentMap<String, Action<Object>> replyMap = new ConcurrentHashMap<>();
511
512        DefaultSocket(final Transport transport) {
513            this.transport = transport;
514            transport.closeActions.add(new VoidAction() {
515                @Override
516                public void on() {
517                    sockets.remove(transport.params.get("id"));
518                    Actions<Object> closeActions = actionsMap.get("close");
519                    if (closeActions != null) {
520                        closeActions.fire();
521                    }
522                }
523            });
524            transport.messageActions.add(new Action<String>() {
525                @Override
526                public void on(String text) {
527                    final Map<String, Object> event = parseEvent(text);
528                    Actions<Object> actions = actionsMap.get(event.get("type"));
529                    if (actions != null) {
530                        if ((Boolean) event.get("reply")) {
531                            actions.fire(new Reply<Object>() {
532                                @Override
533                                public Object data() {
534                                    return event.get("data");
535                                }
536
537                                @Override
538                                public void done() {
539                                    done(null);
540                                }
541
542                                @Override
543                                public void done(Object value) {
544                                    sendReply(value, false);
545                                }
546
547                                @Override
548                                public void fail() {
549                                    fail(null);
550                                }
551
552                                @Override
553                                public void fail(Object value) {
554                                    sendReply(value, true);
555                                }
556
557                                AtomicBoolean sent = new AtomicBoolean();
558                                private void sendReply(Object value, boolean exception) {
559                                    if (sent.compareAndSet(false, true)) {
560                                        Map<String, Object> result = new LinkedHashMap<String, Object>();
561                                        result.put("id", event.get("id"));
562                                        result.put("data", value);
563                                        result.put("exception", exception);
564                                        send("reply", result);
565                                    }
566                                }
567                            });
568                        } else {
569                            actions.fire(event.get("data"));
570                        }
571                    }
572                }
573            });
574            on("reply", new Action<Map<String, Object>>() {
575                @Override
576                public void on(Map<String, Object> info) {
577                    Action<Object> reply = replyMap.remove(info.get("id"));
578                    if (reply != null) {
579                        reply.on(info.get("data"));
580                    } else {
581                        log.error("Reply callback not found in socket#{} with info, {}", id(), info);
582                    }
583                }
584            });
585            try {
586                new HeartbeatHelper(Long.valueOf(transport.params.get("heartbeat")));
587            } catch (NumberFormatException e) {}
588            
589            sockets.put(id(), this);
590        }
591        
592        class HeartbeatHelper {
593            final long delay;
594            AtomicReference<Timer> timer = new AtomicReference<>();
595            
596            HeartbeatHelper(long delay) {
597                this.delay = delay;
598                timer.set(createTimer());
599                on("heartbeat", new VoidAction() {
600                    @Override
601                    public void on() {
602                        timer.getAndSet(createTimer()).cancel();
603                        send("heartbeat");
604                    }
605                });
606            }
607            
608            Timer createTimer() {
609                Timer timer = new Timer(true);
610                timer.schedule(new TimerTask() {
611                    @Override
612                    public void run() {
613                        close();
614                    }
615                }, delay);
616                return timer;
617            }
618        }
619
620        @Override
621        public String id() {
622            return transport.params.get("id");
623        }
624
625        @Override
626        public String uri() {
627            return transport.uri();
628        }
629
630        @Override
631        public Set<String> tags() {
632            return tags;
633        }
634
635        @SuppressWarnings("unchecked")
636        @Override
637        public <T> Socket on(String event, Action<T> action) {
638            Actions<Object> actions = actionsMap.get(event);
639            if (actions == null) {
640                Actions<Object> value = new ConcurrentActions<>();
641                actions = actionsMap.putIfAbsent(event, value);
642                if (actions == null) {
643                    actions = value;
644                }
645            }
646            actions.add((Action<Object>) action);
647            return this;
648        }
649
650        @SuppressWarnings("unchecked")
651        @Override
652        public <T> Socket off(String event, Action<T> action) {
653            Actions<Object> actions = actionsMap.get(event);
654            if (actions != null) {
655                actions.remove((Action<Object>) action);
656            }
657            return this;
658        }
659
660        @Override
661        public Socket send(String event) {
662            return send(event, null);
663        }
664
665        @Override
666        public Socket send(String event, Object data) {
667            return send(event, data, null);
668        }
669
670        @SuppressWarnings("unchecked")
671        @Override
672        public <T> Socket send(String type, Object data, Action<T> reply) {
673            String eventId = UUID.randomUUID().toString();
674            Map<String, Object> event = new LinkedHashMap<String, Object>();
675
676            event.put("id", eventId);
677            event.put("type", type);
678            event.put("data", data);
679            event.put("reply", reply != null);
680            
681            String text = stringifyEvent(event);
682            transport.send(text);
683            if (reply != null) {
684                replyMap.put(eventId, (Action<Object>) reply);
685            }
686            return this;
687        }
688
689        @Override
690        public Socket close() {
691            transport.close();
692            return this;
693        }
694
695        @Override
696        public int hashCode() {
697            final int prime = 31;
698            int result = 1;
699            result = prime * result + ((id() == null) ? 0 : id().hashCode());
700            return result;
701        }
702
703        @Override
704        public boolean equals(Object obj) {
705            if (this == obj)
706                return true;
707            if (obj == null)
708                return false;
709            if (getClass() != obj.getClass())
710                return false;
711            DefaultSocket other = (DefaultSocket) obj;
712            if (id() == null) {
713                if (other.id() != null)
714                    return false;
715            } else if (!id().equals(other.id()))
716                return false;
717            return true;
718        }
719    }
720
721}