001/*
002 * Copyright 2012-2014 Donghwan Kim
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.github.flowersinthesand.portal;
017
018import io.github.flowersinthesand.wes.Action;
019import io.github.flowersinthesand.wes.Actions;
020import io.github.flowersinthesand.wes.ConcurrentActions;
021import io.github.flowersinthesand.wes.Data;
022import io.github.flowersinthesand.wes.HttpStatus;
023import io.github.flowersinthesand.wes.ServerHttpExchange;
024import io.github.flowersinthesand.wes.ServerWebSocket;
025import io.github.flowersinthesand.wes.VoidAction;
026
027import java.io.IOException;
028import java.io.UnsupportedEncodingException;
029import java.net.URI;
030import java.net.URLDecoder;
031import java.nio.CharBuffer;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.LinkedHashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.Timer;
040import java.util.TimerTask;
041import java.util.UUID;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentMap;
044import java.util.concurrent.CopyOnWriteArraySet;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.regex.Matcher;
048import java.util.regex.Pattern;
049
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import com.fasterxml.jackson.core.JsonProcessingException;
054import com.fasterxml.jackson.core.type.TypeReference;
055import com.fasterxml.jackson.databind.ObjectMapper;
056
057/**
058 * Default implementation of {@link Server}.
059 * <p>
060 * This implementation provides and manages {@link Socket} processing HTTP
061 * request and WebSocket following the portal protocol.
062 * <p>
063 * As options, the following methods can be overridden.
064 * <ul>
065 * <li>{@link DefaultServer#parseURI(String)}
066 * <li>{@link DefaultServer#parseEvent(String)}
067 * <li>{@link DefaultServer#stringifyEvent(Map)}
068 * </ul>
069 * 
070 * @author Donghwan Kim
071 */
072public class DefaultServer implements Server {
073
074    private final Logger log = LoggerFactory.getLogger(DefaultServer.class);
075    private ConcurrentMap<String, DefaultSocket> sockets = new ConcurrentHashMap<>();
076    private Actions<Socket> socketActions = new ConcurrentActions<>();
077
078    private Action<ServerHttpExchange> httpAction = new Action<ServerHttpExchange>() {
079        @Override
080        public void on(final ServerHttpExchange http) {
081            switch (http.method()) {
082            case "GET":
083                Map<String, String> params = parseURI(http.uri());
084                setNocache(http);
085                setCors(http);
086                switch (params.get("when")) {
087                case "open":
088                    switch (params.get("transport")) {
089                    case "sse":
090                    case "streamxhr":
091                    case "streamxdr":
092                    case "streamiframe":
093                        socketActions.fire(new DefaultSocket(new StreamTransport(params, http)));
094                        break;
095                    case "longpollajax":
096                    case "longpollxdr":
097                    case "longpolljsonp":
098                        socketActions.fire(new DefaultSocket(new LongpollTransport(params, http)));
099                        break;
100                    default:
101                        log.error("Transport, {}, is not supported", params.get("transport"));
102                        http.setStatus(HttpStatus.NOT_IMPLEMENTED).close();
103                    }
104                    break;
105                case "poll": {
106                    String id = params.get("id");
107                    DefaultSocket socket = sockets.get(id);
108                    if (socket != null) {
109                        Transport transport = socket.transport;
110                        if (transport instanceof LongpollTransport) {
111                            ((LongpollTransport) transport).refresh(http);
112                        } else {
113                            log.error("Non-long polling transport#{} sent poll request", id);
114                            http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close();
115                        }
116                    } else {
117                        log.error("Long polling transport#{} is not found in poll request", id);
118                        http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close();
119                    }
120                    break; }
121                case "abort": {
122                    String id = params.get("id");
123                    Socket socket = sockets.get(id);
124                    if (socket != null) {
125                        socket.close();
126                    }
127                    http.setResponseHeader("content-type", "text/javascript; charset=utf-8").close();
128                    break; }
129                default:
130                    log.error("when, {}, is not supported", params.get("when"));
131                    http.setStatus(HttpStatus.NOT_IMPLEMENTED).close();
132                    break;
133                }
134                break;
135            case "POST":
136                setNocache(http);
137                setCors(http);
138                http.bodyAction(new Action<Data>() {
139                    @Override
140                    public void on(Data body) {
141                        String data = body.as(String.class).substring("data=".length());
142                        String id = findSocketId(data);
143                        
144                        DefaultSocket socket = sockets.get(id);
145                        if (socket != null) {
146                            Transport transport = socket.transport;
147                            if (transport instanceof HttpTransport) {
148                                transport.messageActions.fire(data);
149                            } else {
150                                log.error("Non-HTTP socket#{} receives a POST message", id);
151                                http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
152                            }
153                        } else {
154                            log.error("A POST message arrived but no socket#{} is found", id);
155                            http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
156                        }
157                        http.close();
158                    };
159                    
160                    private String findSocketId(String text) {
161                        Matcher matcher = Pattern.compile("\"socket\":\"([^\"]+)\"").matcher(text);
162                        matcher.find();
163                        return matcher.group(1);
164                    }
165                });
166                break;
167            default:
168                log.error("HTTP method, {}, is not supported", http.method());
169                http.setStatus(HttpStatus.METHOD_NOT_ALLOWED).close();
170                break;
171            }
172        }
173        
174        private void setNocache(ServerHttpExchange http) {
175            http
176            .setResponseHeader("cache-control", "no-cache, no-store, must-revalidate")
177            .setResponseHeader("pragma", "no-cache")
178            .setResponseHeader("expires", "0");
179        }
180        
181        private void setCors(ServerHttpExchange http) {
182            String origin = http.requestHeader("origin");
183            String acrh = http.requestHeader("access-control-request-headers");
184            http
185            .setResponseHeader("access-control-allow-origin", origin != null ? origin : "*")
186            .setResponseHeader("access-control-allow-credentials", "true");
187            if (acrh != null) {
188                http.setResponseHeader("access-control-allow-headers", acrh);
189            }
190        }
191    };
192
193    private Action<ServerWebSocket> websocketAction = new Action<ServerWebSocket>() {
194        @Override
195        public void on(ServerWebSocket ws) {
196            Map<String, String> params = parseURI(ws.uri());
197            socketActions.fire(new DefaultSocket(new WebSocketTransport(params, ws)));
198        }
199    };
200
201    /**
202     * Takes a portal URI and returns a map of parameters.
203     * <p>
204     * This is a counterpart of {@code urlBuilder} of client option.
205     */
206    protected Map<String, String> parseURI(String uri) {
207        Map<String, String> map = new LinkedHashMap<>();
208        String query = URI.create(uri).getQuery();
209        if ((query == null) || (query.equals(""))) {
210            return map;
211        }
212
213        String[] params = query.split("&");
214        for (String param : params) {
215            try {
216                String[] pair = param.split("=", 2);
217                String name = URLDecoder.decode(pair[0], "UTF-8");
218                if (name == "") {
219                    continue;
220                }
221
222                map.put(name, pair.length > 1 ? URLDecoder.decode(pair[1], "UTF-8") : "");
223            } catch (UnsupportedEncodingException e) {}
224        }
225
226        return Collections.unmodifiableMap(map);
227    }
228
229    /**
230     * Takes a stringified event and returns an event object.
231     * <p>
232     * A text in argument is generated by {@code outbound} of client option and
233     * this is akin to {@code inbound} of client option.
234     */
235    protected Map<String, Object> parseEvent(String text) {
236        try {
237            return new ObjectMapper().readValue(text, new TypeReference<Map<String, Object>>() {});
238        } catch (IOException e) {
239            throw new RuntimeException(e);
240        }
241    }
242
243    /**
244     * Takes an event object and returns a stringified event.
245     * <p>
246     * This is akin to {@code outbound} of client option and a returned value will
247     * be handled by {@code inbound} of client option.
248     */
249    protected String stringifyEvent(Map<String, Object> event) {
250        try {
251            return new ObjectMapper().writeValueAsString(event);
252        } catch (JsonProcessingException e) {
253            throw new RuntimeException(e);
254        }
255    }
256    
257    @Override
258    public Sentence all() {
259        return new Sentence(new Action<Action<Socket>>() {
260            @Override
261            public void on(Action<Socket> action) {
262                all(action);
263            }
264        });
265    }
266    
267    @Override
268    public Server all(Action<Socket> action) {
269        for (Socket socket : sockets.values()) {
270            action.on(socket);
271        }
272        return this;
273    }
274    
275    @Override
276    public Sentence byId(final String id) {
277        return new Sentence(new Action<Action<Socket>>() {
278            @Override
279            public void on(Action<Socket> action) {
280                byId(id, action);
281            }
282        });
283    }
284
285    @Override
286    public Server byId(String id, Action<Socket> action) {
287        Socket socket = sockets.get(id);
288        if (socket != null) {
289            action.on(socket);
290        }
291        return this;
292    }
293
294    @Override
295    public Sentence byTag(final String... names) {
296        return new Sentence(new Action<Action<Socket>>() {
297            @Override
298            public void on(Action<Socket> action) {
299                byTag(names, action);
300            }
301        });
302    }
303
304    @Override
305    public Server byTag(String name, Action<Socket> action) {
306        return byTag(new String[] { name }, action);
307    }
308
309    @Override
310    public Server byTag(String[] names, Action<Socket> action) {
311        List<String> nameList = Arrays.asList(names);
312        for (Socket socket : sockets.values()) {
313            if (socket.tags().containsAll(nameList)) {
314                action.on(socket);
315            }
316        }
317        return this;
318    }
319
320    @Override
321    public Server socketAction(Action<Socket> action) {
322        socketActions.add(action);
323        return this;
324    }
325
326    @Override
327    public Action<ServerHttpExchange> httpAction() {
328        return httpAction;
329    }
330
331    @Override
332    public Action<ServerWebSocket> websocketAction() {
333        return websocketAction;
334    }
335
336    private abstract class Transport {
337        final Map<String, String> params;
338        Actions<String> messageActions = new ConcurrentActions<>();
339        Actions<Void> closeActions = new ConcurrentActions<>();
340
341        Transport(Map<String, String> params) {
342            this.params = params;
343        }
344        
345        abstract String uri();
346        abstract void send(String data);
347        abstract void close();
348    }
349
350    private class WebSocketTransport extends Transport {
351        final ServerWebSocket ws;
352        
353        WebSocketTransport(Map<String, String> params, ServerWebSocket ws) {
354            super(params);
355            this.ws = ws;
356            ws.closeAction(new VoidAction() {
357                @Override
358                public void on() {
359                    closeActions.fire();
360                }
361            })
362            .messageAction(new Action<Data>() {
363                @Override
364                public void on(Data data) {
365                    messageActions.fire(data.as(String.class));
366                }
367            });
368        }
369        
370        @Override
371        String uri() {
372            return ws.uri();
373        }
374
375        @Override
376        synchronized void send(String data) {
377            ws.send(data);
378        }
379
380        @Override
381        synchronized void close() {
382            ws.close();
383        }
384    }
385    
386    private abstract class HttpTransport extends Transport {
387        final ServerHttpExchange http;
388        
389        HttpTransport(Map<String, String> params, ServerHttpExchange http) {
390            super(params);
391            this.http = http;
392        }
393        
394        @Override
395        String uri() {
396            return http.uri();
397        }
398    }
399
400    final static String text2KB = CharBuffer.allocate(2048).toString().replace('\0', ' ');
401    
402    private class StreamTransport extends HttpTransport {
403        final boolean isAndroidLowerThan3;
404        
405        StreamTransport(Map<String, String> params, ServerHttpExchange http) {
406            super(params, http);
407            String ua = http.requestHeader("user-agent");
408            this.isAndroidLowerThan3 = ua == null ? false : ua.matches(".*Android\\s[23]\\..*");
409            http.closeAction(new VoidAction() {
410                @Override
411                public void on() {
412                    closeActions.fire();
413                }
414            })
415            .setResponseHeader("content-type",
416                "text/" + (params.get("transport").equals("sse") ? "event-stream" : "plain") + "; charset=utf-8")
417            .write((isAndroidLowerThan3 ? text2KB : "") + text2KB + "\n");
418        }
419        
420        @Override
421        synchronized void send(String data) {
422            String payload = (isAndroidLowerThan3 ? text2KB + text2KB : "") + "";
423            for (String datum : data.split("\r\n|\r|\n")) {
424                payload += "data: " + datum + "\n";
425            }
426            payload += "\n";
427            http.write(payload);
428        }
429        
430        @Override
431        synchronized void close() {
432            http.close();
433        }
434    }
435    
436    private class LongpollTransport extends HttpTransport {
437        AtomicReference<ServerHttpExchange> httpRef = new AtomicReference<>();
438        AtomicBoolean closed = new AtomicBoolean();
439        AtomicBoolean written = new AtomicBoolean();
440        Set<String> buffer = new CopyOnWriteArraySet<>(); 
441        AtomicReference<Timer> closeTimer = new AtomicReference<>();
442        
443        LongpollTransport(Map<String, String> params, ServerHttpExchange http) {
444            super(params, http);
445            refresh(http);
446        }
447
448        void refresh(ServerHttpExchange http) {
449            final Map<String, String> parameters = parseURI(http.uri());
450            http.closeAction(new VoidAction() {
451                @Override
452                public void on() {
453                    closed.set(true);
454                    if (parameters.get("when").equals("poll") && !written.get()) {
455                        closeActions.fire();
456                    }
457                    Timer timer = new Timer(true);
458                    timer.schedule(new TimerTask() {
459                        @Override
460                        public void run() {
461                            closeActions.fire();
462                        }
463                    }, 500);
464                    closeTimer.set(timer);
465                }
466            })
467            .setResponseHeader("content-type", 
468                "text/" + (params.get("transport").equals("longpolljsonp") ? "javascript" : "plain") + "; charset=utf-8");
469            
470            if (parameters.get("when").equals("open")) {
471                http.close();
472            } else {
473                httpRef.set(http);
474                closed.set(false);
475                written.set(false);
476                Timer timer = closeTimer.getAndSet(null);
477                if (timer != null) {
478                    timer.cancel();
479                }
480                if (parameters.containsKey("lastEventIds")) {
481                    String[] lastEventIds = parameters.get("lastEventIds").split(",");
482                    for (String eventId : lastEventIds) {
483                        for (String message : buffer) {
484                            if (eventId.equals(findEventId(message))) {
485                                buffer.remove(message);
486                            }
487                        }
488                    }
489                    if (!buffer.isEmpty()) {
490                        Iterator<String> iterator = buffer.iterator();
491                        String string = iterator.next();
492                        while (iterator.hasNext()) {
493                            string += "," + iterator.next();
494                        }
495                        send("[" + string + "]");
496                    }
497                }
498            }
499        }
500
501        private String findEventId(String text) {
502            Matcher matcher = Pattern.compile("\"id\":\"([^\"]+)\"").matcher(text);
503            matcher.find();
504            return matcher.group(1);
505        }
506
507        @Override
508        synchronized void send(String data) {
509            if (!data.startsWith("[")) {
510                buffer.add(data);
511            }
512            ServerHttpExchange http = httpRef.getAndSet(null);
513            if (http != null && !closed.get()) {
514                written.set(true);
515                String payload;
516                try {
517                    payload = params.get("transport").equals("longpolljsonp") ? 
518                        params.get("callback") + "(" + new ObjectMapper().writeValueAsString(data) + ");" : 
519                        data; 
520                } catch (JsonProcessingException e) {
521                    throw new RuntimeException(e);
522                }
523                http.close(payload);
524            }
525        }
526        
527        @Override
528        synchronized void close() {
529            ServerHttpExchange http = httpRef.getAndSet(null);
530            if (http != null && !closed.get()) {
531                http.close();
532            }
533        }
534    }
535    
536    private class DefaultSocket implements Socket {
537        final Transport transport;
538        Set<String> tags = new CopyOnWriteArraySet<>();
539        ConcurrentMap<String, Actions<Object>> actionsMap = new ConcurrentHashMap<>();
540        ConcurrentMap<String, Action<Object>> replyMap = new ConcurrentHashMap<>();
541
542        DefaultSocket(final Transport transport) {
543            this.transport = transport;
544            transport.closeActions.add(new VoidAction() {
545                @Override
546                public void on() {
547                    sockets.remove(transport.params.get("id"));
548                    Actions<Object> closeActions = actionsMap.get("close");
549                    if (closeActions != null) {
550                        closeActions.fire();
551                    }
552                }
553            });
554            transport.messageActions.add(new Action<String>() {
555                @Override
556                public void on(String text) {
557                    final Map<String, Object> event = parseEvent(text);
558                    Actions<Object> actions = actionsMap.get(event.get("type"));
559                    if (actions != null) {
560                        if ((Boolean) event.get("reply")) {
561                            actions.fire(new Reply<Object>() {
562                                @Override
563                                public Object data() {
564                                    return event.get("data");
565                                }
566
567                                @Override
568                                public void done() {
569                                    done(null);
570                                }
571
572                                @Override
573                                public void done(Object value) {
574                                    sendReply(value, false);
575                                }
576
577                                @Override
578                                public void fail() {
579                                    fail(null);
580                                }
581
582                                @Override
583                                public void fail(Object value) {
584                                    sendReply(value, true);
585                                }
586
587                                AtomicBoolean sent = new AtomicBoolean();
588                                private void sendReply(Object value, boolean exception) {
589                                    if (sent.compareAndSet(false, true)) {
590                                        Map<String, Object> result = new LinkedHashMap<String, Object>();
591                                        result.put("id", event.get("id"));
592                                        result.put("data", value);
593                                        result.put("exception", exception);
594                                        send("reply", result);
595                                    }
596                                }
597                            });
598                        } else {
599                            actions.fire(event.get("data"));
600                        }
601                    }
602                }
603            });
604            on("reply", new Action<Map<String, Object>>() {
605                @Override
606                public void on(Map<String, Object> info) {
607                    Action<Object> reply = replyMap.remove(info.get("id"));
608                    if (reply != null) {
609                        reply.on(info.get("data"));
610                    } else {
611                        log.error("Reply callback not found in socket#{} with info, {}", id(), info);
612                    }
613                }
614            });
615            try {
616                new HeartbeatHelper(Long.valueOf(transport.params.get("heartbeat")));
617            } catch (NumberFormatException e) {}
618            
619            sockets.put(id(), this);
620        }
621        
622        class HeartbeatHelper {
623            final long delay;
624            AtomicReference<Timer> timer = new AtomicReference<>();
625            
626            HeartbeatHelper(long delay) {
627                this.delay = delay;
628                timer.set(createTimer());
629                on("heartbeat", new VoidAction() {
630                    @Override
631                    public void on() {
632                        timer.getAndSet(createTimer()).cancel();
633                        send("heartbeat");
634                    }
635                });
636            }
637            
638            Timer createTimer() {
639                Timer timer = new Timer(true);
640                timer.schedule(new TimerTask() {
641                    @Override
642                    public void run() {
643                        close();
644                    }
645                }, delay);
646                return timer;
647            }
648        }
649
650        @Override
651        public String id() {
652            return transport.params.get("id");
653        }
654
655        @Override
656        public String uri() {
657            return transport.uri();
658        }
659
660        @Override
661        public Set<String> tags() {
662            return tags;
663        }
664
665        @SuppressWarnings("unchecked")
666        @Override
667        public <T> Socket on(String event, Action<T> action) {
668            Actions<Object> actions = actionsMap.get(event);
669            if (actions == null) {
670                Actions<Object> value = new ConcurrentActions<>();
671                actions = actionsMap.putIfAbsent(event, value);
672                if (actions == null) {
673                    actions = value;
674                }
675            }
676            actions.add((Action<Object>) action);
677            return this;
678        }
679
680        @SuppressWarnings("unchecked")
681        @Override
682        public <T> Socket off(String event, Action<T> action) {
683            Actions<Object> actions = actionsMap.get(event);
684            if (actions != null) {
685                actions.remove((Action<Object>) action);
686            }
687            return this;
688        }
689
690        @Override
691        public Socket send(String event) {
692            return send(event, null);
693        }
694
695        @Override
696        public Socket send(String event, Object data) {
697            return send(event, data, null);
698        }
699
700        @SuppressWarnings("unchecked")
701        @Override
702        public <T> Socket send(String type, Object data, Action<T> reply) {
703            String eventId = UUID.randomUUID().toString();
704            Map<String, Object> event = new LinkedHashMap<String, Object>();
705
706            event.put("id", eventId);
707            event.put("type", type);
708            event.put("data", data);
709            event.put("reply", reply != null);
710            
711            String text = stringifyEvent(event);
712            transport.send(text);
713            if (reply != null) {
714                replyMap.put(eventId, (Action<Object>) reply);
715            }
716            return this;
717        }
718
719        @Override
720        public Socket close() {
721            transport.close();
722            return this;
723        }
724
725        @Override
726        public int hashCode() {
727            final int prime = 31;
728            int result = 1;
729            result = prime * result + ((id() == null) ? 0 : id().hashCode());
730            return result;
731        }
732
733        @Override
734        public boolean equals(Object obj) {
735            if (this == obj)
736                return true;
737            if (obj == null)
738                return false;
739            if (getClass() != obj.getClass())
740                return false;
741            DefaultSocket other = (DefaultSocket) obj;
742            if (id() == null) {
743                if (other.id() != null)
744                    return false;
745            } else if (!id().equals(other.id()))
746                return false;
747            return true;
748        }
749    }
750
751}