001/* 002 * Copyright 2012-2014 Donghwan Kim 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.github.flowersinthesand.portal; 017 018import io.github.flowersinthesand.wes.Action; 019import io.github.flowersinthesand.wes.Actions; 020import io.github.flowersinthesand.wes.ConcurrentActions; 021import io.github.flowersinthesand.wes.Data; 022import io.github.flowersinthesand.wes.HttpStatus; 023import io.github.flowersinthesand.wes.ServerHttpExchange; 024import io.github.flowersinthesand.wes.ServerWebSocket; 025import io.github.flowersinthesand.wes.VoidAction; 026 027import java.io.IOException; 028import java.io.UnsupportedEncodingException; 029import java.net.URI; 030import java.net.URLDecoder; 031import java.nio.CharBuffer; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.LinkedHashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Timer; 040import java.util.TimerTask; 041import java.util.UUID; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.ConcurrentMap; 044import java.util.concurrent.CopyOnWriteArraySet; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicReference; 047import java.util.regex.Matcher; 048import java.util.regex.Pattern; 049 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import com.fasterxml.jackson.core.JsonProcessingException; 054import com.fasterxml.jackson.core.type.TypeReference; 055import com.fasterxml.jackson.databind.ObjectMapper; 056 057/** 058 * Default implementation of {@link Server}. 059 * <p> 060 * This implementation provides and manages {@link Socket} processing HTTP 061 * request and WebSocket following the portal protocol. 062 * <p> 063 * As options, the following methods can be overridden. 064 * <ul> 065 * <li>{@link DefaultServer#parseURI(String)} 066 * <li>{@link DefaultServer#parseEvent(String)} 067 * <li>{@link DefaultServer#stringifyEvent(Map)} 068 * </ul> 069 * 070 * @author Donghwan Kim 071 */ 072public class DefaultServer implements Server { 073 074 private final Logger log = LoggerFactory.getLogger(DefaultServer.class); 075 private ConcurrentMap<String, DefaultSocket> sockets = new ConcurrentHashMap<>(); 076 private Actions<Socket> socketActions = new ConcurrentActions<>(); 077 078 private Action<ServerHttpExchange> httpAction = new Action<ServerHttpExchange>() { 079 @Override 080 public void on(final ServerHttpExchange http) { 081 switch (http.method()) { 082 case "GET": 083 Map<String, String> params = parseURI(http.uri()); 084 setNocache(http); 085 setCors(http); 086 switch (params.get("when")) { 087 case "open": 088 switch (params.get("transport")) { 089 case "sse": 090 case "streamxhr": 091 case "streamxdr": 092 case "streamiframe": 093 socketActions.fire(new DefaultSocket(new StreamTransport(params, http))); 094 break; 095 case "longpollajax": 096 case "longpollxdr": 097 case "longpolljsonp": 098 socketActions.fire(new DefaultSocket(new LongpollTransport(params, http))); 099 break; 100 default: 101 log.error("Transport, {}, is not supported", params.get("transport")); 102 http.setStatus(HttpStatus.NOT_IMPLEMENTED).close(); 103 } 104 break; 105 case "poll": { 106 String id = params.get("id"); 107 DefaultSocket socket = sockets.get(id); 108 if (socket != null) { 109 Transport transport = socket.transport; 110 if (transport instanceof LongpollTransport) { 111 ((LongpollTransport) transport).refresh(http); 112 } else { 113 log.error("Non-long polling transport#{} sent poll request", id); 114 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close(); 115 } 116 } else { 117 log.error("Long polling transport#{} is not found in poll request", id); 118 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close(); 119 } 120 break; } 121 case "abort": { 122 String id = params.get("id"); 123 Socket socket = sockets.get(id); 124 if (socket != null) { 125 socket.close(); 126 } 127 http.setResponseHeader("content-type", "text/javascript; charset=utf-8").close(); 128 break; } 129 default: 130 log.error("when, {}, is not supported", params.get("when")); 131 http.setStatus(HttpStatus.NOT_IMPLEMENTED).close(); 132 break; 133 } 134 break; 135 case "POST": 136 setNocache(http); 137 setCors(http); 138 http.bodyAction(new Action<Data>() { 139 @Override 140 public void on(Data body) { 141 String data = body.as(String.class).substring("data=".length()); 142 String id = findSocketId(data); 143 144 DefaultSocket socket = sockets.get(id); 145 if (socket != null) { 146 Transport transport = socket.transport; 147 if (transport instanceof HttpTransport) { 148 transport.messageActions.fire(data); 149 } else { 150 log.error("Non-HTTP socket#{} receives a POST message", id); 151 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR); 152 } 153 } else { 154 log.error("A POST message arrived but no socket#{} is found", id); 155 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR); 156 } 157 http.close(); 158 }; 159 160 private String findSocketId(String text) { 161 Matcher matcher = Pattern.compile("\"socket\":\"([^\"]+)\"").matcher(text); 162 matcher.find(); 163 return matcher.group(1); 164 } 165 }); 166 break; 167 default: 168 log.error("HTTP method, {}, is not supported", http.method()); 169 http.setStatus(HttpStatus.METHOD_NOT_ALLOWED).close(); 170 break; 171 } 172 } 173 174 private void setNocache(ServerHttpExchange http) { 175 http 176 .setResponseHeader("cache-control", "no-cache, no-store, must-revalidate") 177 .setResponseHeader("pragma", "no-cache") 178 .setResponseHeader("expires", "0"); 179 } 180 181 private void setCors(ServerHttpExchange http) { 182 String origin = http.requestHeader("origin"); 183 String acrh = http.requestHeader("access-control-request-headers"); 184 http 185 .setResponseHeader("access-control-allow-origin", origin != null ? origin : "*") 186 .setResponseHeader("access-control-allow-credentials", "true"); 187 if (acrh != null) { 188 http.setResponseHeader("access-control-allow-headers", acrh); 189 } 190 } 191 }; 192 193 private Action<ServerWebSocket> websocketAction = new Action<ServerWebSocket>() { 194 @Override 195 public void on(ServerWebSocket ws) { 196 Map<String, String> params = parseURI(ws.uri()); 197 socketActions.fire(new DefaultSocket(new WebSocketTransport(params, ws))); 198 } 199 }; 200 201 /** 202 * Takes a portal URI and returns a map of parameters. 203 * <p> 204 * This is a counterpart of {@code urlBuilder} of client option. 205 */ 206 protected Map<String, String> parseURI(String uri) { 207 Map<String, String> map = new LinkedHashMap<>(); 208 String query = URI.create(uri).getQuery(); 209 if ((query == null) || (query.equals(""))) { 210 return map; 211 } 212 213 String[] params = query.split("&"); 214 for (String param : params) { 215 try { 216 String[] pair = param.split("=", 2); 217 String name = URLDecoder.decode(pair[0], "UTF-8"); 218 if (name == "") { 219 continue; 220 } 221 222 map.put(name, pair.length > 1 ? URLDecoder.decode(pair[1], "UTF-8") : ""); 223 } catch (UnsupportedEncodingException e) {} 224 } 225 226 return Collections.unmodifiableMap(map); 227 } 228 229 /** 230 * Takes a stringified event and returns an event object. 231 * <p> 232 * A text in argument is generated by {@code outbound} of client option and 233 * this is akin to {@code inbound} of client option. 234 */ 235 protected Map<String, Object> parseEvent(String text) { 236 try { 237 return new ObjectMapper().readValue(text, new TypeReference<Map<String, Object>>() {}); 238 } catch (IOException e) { 239 throw new RuntimeException(e); 240 } 241 } 242 243 /** 244 * Takes an event object and returns a stringified event. 245 * <p> 246 * This is akin to {@code outbound} of client option and a returned value will 247 * be handled by {@code inbound} of client option. 248 */ 249 protected String stringifyEvent(Map<String, Object> event) { 250 try { 251 return new ObjectMapper().writeValueAsString(event); 252 } catch (JsonProcessingException e) { 253 throw new RuntimeException(e); 254 } 255 } 256 257 @Override 258 public Sentence all() { 259 return new Sentence(new Action<Action<Socket>>() { 260 @Override 261 public void on(Action<Socket> action) { 262 all(action); 263 } 264 }); 265 } 266 267 @Override 268 public Server all(Action<Socket> action) { 269 for (Socket socket : sockets.values()) { 270 action.on(socket); 271 } 272 return this; 273 } 274 275 @Override 276 public Sentence byId(final String id) { 277 return new Sentence(new Action<Action<Socket>>() { 278 @Override 279 public void on(Action<Socket> action) { 280 byId(id, action); 281 } 282 }); 283 } 284 285 @Override 286 public Server byId(String id, Action<Socket> action) { 287 Socket socket = sockets.get(id); 288 if (socket != null) { 289 action.on(socket); 290 } 291 return this; 292 } 293 294 @Override 295 public Sentence byTag(final String... names) { 296 return new Sentence(new Action<Action<Socket>>() { 297 @Override 298 public void on(Action<Socket> action) { 299 byTag(names, action); 300 } 301 }); 302 } 303 304 @Override 305 public Server byTag(String name, Action<Socket> action) { 306 return byTag(new String[] { name }, action); 307 } 308 309 @Override 310 public Server byTag(String[] names, Action<Socket> action) { 311 List<String> nameList = Arrays.asList(names); 312 for (Socket socket : sockets.values()) { 313 if (socket.tags().containsAll(nameList)) { 314 action.on(socket); 315 } 316 } 317 return this; 318 } 319 320 @Override 321 public Server socketAction(Action<Socket> action) { 322 socketActions.add(action); 323 return this; 324 } 325 326 @Override 327 public Action<ServerHttpExchange> httpAction() { 328 return httpAction; 329 } 330 331 @Override 332 public Action<ServerWebSocket> websocketAction() { 333 return websocketAction; 334 } 335 336 private abstract class Transport { 337 final Map<String, String> params; 338 Actions<String> messageActions = new ConcurrentActions<>(); 339 Actions<Void> closeActions = new ConcurrentActions<>(); 340 341 Transport(Map<String, String> params) { 342 this.params = params; 343 } 344 345 abstract String uri(); 346 abstract void send(String data); 347 abstract void close(); 348 } 349 350 private class WebSocketTransport extends Transport { 351 final ServerWebSocket ws; 352 353 WebSocketTransport(Map<String, String> params, ServerWebSocket ws) { 354 super(params); 355 this.ws = ws; 356 ws.closeAction(new VoidAction() { 357 @Override 358 public void on() { 359 closeActions.fire(); 360 } 361 }) 362 .messageAction(new Action<Data>() { 363 @Override 364 public void on(Data data) { 365 messageActions.fire(data.as(String.class)); 366 } 367 }); 368 } 369 370 @Override 371 String uri() { 372 return ws.uri(); 373 } 374 375 @Override 376 synchronized void send(String data) { 377 ws.send(data); 378 } 379 380 @Override 381 synchronized void close() { 382 ws.close(); 383 } 384 } 385 386 private abstract class HttpTransport extends Transport { 387 final ServerHttpExchange http; 388 389 HttpTransport(Map<String, String> params, ServerHttpExchange http) { 390 super(params); 391 this.http = http; 392 } 393 394 @Override 395 String uri() { 396 return http.uri(); 397 } 398 } 399 400 final static String text2KB = CharBuffer.allocate(2048).toString().replace('\0', ' '); 401 402 private class StreamTransport extends HttpTransport { 403 final boolean isAndroidLowerThan3; 404 405 StreamTransport(Map<String, String> params, ServerHttpExchange http) { 406 super(params, http); 407 String ua = http.requestHeader("user-agent"); 408 this.isAndroidLowerThan3 = ua == null ? false : ua.matches(".*Android\\s[23]\\..*"); 409 http.closeAction(new VoidAction() { 410 @Override 411 public void on() { 412 closeActions.fire(); 413 } 414 }) 415 .setResponseHeader("content-type", 416 "text/" + (params.get("transport").equals("sse") ? "event-stream" : "plain") + "; charset=utf-8") 417 .write((isAndroidLowerThan3 ? text2KB : "") + text2KB + "\n"); 418 } 419 420 @Override 421 synchronized void send(String data) { 422 String payload = (isAndroidLowerThan3 ? text2KB + text2KB : "") + ""; 423 for (String datum : data.split("\r\n|\r|\n")) { 424 payload += "data: " + datum + "\n"; 425 } 426 payload += "\n"; 427 http.write(payload); 428 } 429 430 @Override 431 synchronized void close() { 432 http.close(); 433 } 434 } 435 436 private class LongpollTransport extends HttpTransport { 437 AtomicReference<ServerHttpExchange> httpRef = new AtomicReference<>(); 438 AtomicBoolean closed = new AtomicBoolean(); 439 AtomicBoolean written = new AtomicBoolean(); 440 Set<String> buffer = new CopyOnWriteArraySet<>(); 441 AtomicReference<Timer> closeTimer = new AtomicReference<>(); 442 443 LongpollTransport(Map<String, String> params, ServerHttpExchange http) { 444 super(params, http); 445 refresh(http); 446 } 447 448 void refresh(ServerHttpExchange http) { 449 final Map<String, String> parameters = parseURI(http.uri()); 450 http.closeAction(new VoidAction() { 451 @Override 452 public void on() { 453 closed.set(true); 454 if (parameters.get("when").equals("poll") && !written.get()) { 455 closeActions.fire(); 456 } 457 Timer timer = new Timer(true); 458 timer.schedule(new TimerTask() { 459 @Override 460 public void run() { 461 closeActions.fire(); 462 } 463 }, 500); 464 closeTimer.set(timer); 465 } 466 }) 467 .setResponseHeader("content-type", 468 "text/" + (params.get("transport").equals("longpolljsonp") ? "javascript" : "plain") + "; charset=utf-8"); 469 470 if (parameters.get("when").equals("open")) { 471 http.close(); 472 } else { 473 httpRef.set(http); 474 closed.set(false); 475 written.set(false); 476 Timer timer = closeTimer.getAndSet(null); 477 if (timer != null) { 478 timer.cancel(); 479 } 480 if (parameters.containsKey("lastEventIds")) { 481 String[] lastEventIds = parameters.get("lastEventIds").split(","); 482 for (String eventId : lastEventIds) { 483 for (String message : buffer) { 484 if (eventId.equals(findEventId(message))) { 485 buffer.remove(message); 486 } 487 } 488 } 489 if (!buffer.isEmpty()) { 490 Iterator<String> iterator = buffer.iterator(); 491 String string = iterator.next(); 492 while (iterator.hasNext()) { 493 string += "," + iterator.next(); 494 } 495 send("[" + string + "]"); 496 } 497 } 498 } 499 } 500 501 private String findEventId(String text) { 502 Matcher matcher = Pattern.compile("\"id\":\"([^\"]+)\"").matcher(text); 503 matcher.find(); 504 return matcher.group(1); 505 } 506 507 @Override 508 synchronized void send(String data) { 509 if (!data.startsWith("[")) { 510 buffer.add(data); 511 } 512 ServerHttpExchange http = httpRef.getAndSet(null); 513 if (http != null && !closed.get()) { 514 written.set(true); 515 String payload; 516 try { 517 payload = params.get("transport").equals("longpolljsonp") ? 518 params.get("callback") + "(" + new ObjectMapper().writeValueAsString(data) + ");" : 519 data; 520 } catch (JsonProcessingException e) { 521 throw new RuntimeException(e); 522 } 523 http.close(payload); 524 } 525 } 526 527 @Override 528 synchronized void close() { 529 ServerHttpExchange http = httpRef.getAndSet(null); 530 if (http != null && !closed.get()) { 531 http.close(); 532 } 533 } 534 } 535 536 private class DefaultSocket implements Socket { 537 final Transport transport; 538 Set<String> tags = new CopyOnWriteArraySet<>(); 539 ConcurrentMap<String, Actions<Object>> actionsMap = new ConcurrentHashMap<>(); 540 ConcurrentMap<String, Action<Object>> replyMap = new ConcurrentHashMap<>(); 541 542 DefaultSocket(final Transport transport) { 543 this.transport = transport; 544 transport.closeActions.add(new VoidAction() { 545 @Override 546 public void on() { 547 sockets.remove(transport.params.get("id")); 548 Actions<Object> closeActions = actionsMap.get("close"); 549 if (closeActions != null) { 550 closeActions.fire(); 551 } 552 } 553 }); 554 transport.messageActions.add(new Action<String>() { 555 @Override 556 public void on(String text) { 557 final Map<String, Object> event = parseEvent(text); 558 Actions<Object> actions = actionsMap.get(event.get("type")); 559 if (actions != null) { 560 if ((Boolean) event.get("reply")) { 561 actions.fire(new Reply<Object>() { 562 @Override 563 public Object data() { 564 return event.get("data"); 565 } 566 567 @Override 568 public void done() { 569 done(null); 570 } 571 572 @Override 573 public void done(Object value) { 574 sendReply(value, false); 575 } 576 577 @Override 578 public void fail() { 579 fail(null); 580 } 581 582 @Override 583 public void fail(Object value) { 584 sendReply(value, true); 585 } 586 587 AtomicBoolean sent = new AtomicBoolean(); 588 private void sendReply(Object value, boolean exception) { 589 if (sent.compareAndSet(false, true)) { 590 Map<String, Object> result = new LinkedHashMap<String, Object>(); 591 result.put("id", event.get("id")); 592 result.put("data", value); 593 result.put("exception", exception); 594 send("reply", result); 595 } 596 } 597 }); 598 } else { 599 actions.fire(event.get("data")); 600 } 601 } 602 } 603 }); 604 on("reply", new Action<Map<String, Object>>() { 605 @Override 606 public void on(Map<String, Object> info) { 607 Action<Object> reply = replyMap.remove(info.get("id")); 608 if (reply != null) { 609 reply.on(info.get("data")); 610 } else { 611 log.error("Reply callback not found in socket#{} with info, {}", id(), info); 612 } 613 } 614 }); 615 try { 616 new HeartbeatHelper(Long.valueOf(transport.params.get("heartbeat"))); 617 } catch (NumberFormatException e) {} 618 619 sockets.put(id(), this); 620 } 621 622 class HeartbeatHelper { 623 final long delay; 624 AtomicReference<Timer> timer = new AtomicReference<>(); 625 626 HeartbeatHelper(long delay) { 627 this.delay = delay; 628 timer.set(createTimer()); 629 on("heartbeat", new VoidAction() { 630 @Override 631 public void on() { 632 timer.getAndSet(createTimer()).cancel(); 633 send("heartbeat"); 634 } 635 }); 636 } 637 638 Timer createTimer() { 639 Timer timer = new Timer(true); 640 timer.schedule(new TimerTask() { 641 @Override 642 public void run() { 643 close(); 644 } 645 }, delay); 646 return timer; 647 } 648 } 649 650 @Override 651 public String id() { 652 return transport.params.get("id"); 653 } 654 655 @Override 656 public String uri() { 657 return transport.uri(); 658 } 659 660 @Override 661 public Set<String> tags() { 662 return tags; 663 } 664 665 @SuppressWarnings("unchecked") 666 @Override 667 public <T> Socket on(String event, Action<T> action) { 668 Actions<Object> actions = actionsMap.get(event); 669 if (actions == null) { 670 Actions<Object> value = new ConcurrentActions<>(); 671 actions = actionsMap.putIfAbsent(event, value); 672 if (actions == null) { 673 actions = value; 674 } 675 } 676 actions.add((Action<Object>) action); 677 return this; 678 } 679 680 @SuppressWarnings("unchecked") 681 @Override 682 public <T> Socket off(String event, Action<T> action) { 683 Actions<Object> actions = actionsMap.get(event); 684 if (actions != null) { 685 actions.remove((Action<Object>) action); 686 } 687 return this; 688 } 689 690 @Override 691 public Socket send(String event) { 692 return send(event, null); 693 } 694 695 @Override 696 public Socket send(String event, Object data) { 697 return send(event, data, null); 698 } 699 700 @SuppressWarnings("unchecked") 701 @Override 702 public <T> Socket send(String type, Object data, Action<T> reply) { 703 String eventId = UUID.randomUUID().toString(); 704 Map<String, Object> event = new LinkedHashMap<String, Object>(); 705 706 event.put("id", eventId); 707 event.put("type", type); 708 event.put("data", data); 709 event.put("reply", reply != null); 710 711 String text = stringifyEvent(event); 712 transport.send(text); 713 if (reply != null) { 714 replyMap.put(eventId, (Action<Object>) reply); 715 } 716 return this; 717 } 718 719 @Override 720 public Socket close() { 721 transport.close(); 722 return this; 723 } 724 725 @Override 726 public int hashCode() { 727 final int prime = 31; 728 int result = 1; 729 result = prime * result + ((id() == null) ? 0 : id().hashCode()); 730 return result; 731 } 732 733 @Override 734 public boolean equals(Object obj) { 735 if (this == obj) 736 return true; 737 if (obj == null) 738 return false; 739 if (getClass() != obj.getClass()) 740 return false; 741 DefaultSocket other = (DefaultSocket) obj; 742 if (id() == null) { 743 if (other.id() != null) 744 return false; 745 } else if (!id().equals(other.id())) 746 return false; 747 return true; 748 } 749 } 750 751}