001/* 002 * Copyright 2012-2014 Donghwan Kim 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.github.flowersinthesand.portal; 017 018import io.github.flowersinthesand.wes.Action; 019import io.github.flowersinthesand.wes.Actions; 020import io.github.flowersinthesand.wes.ConcurrentActions; 021import io.github.flowersinthesand.wes.Data; 022import io.github.flowersinthesand.wes.HttpStatus; 023import io.github.flowersinthesand.wes.ServerHttpExchange; 024import io.github.flowersinthesand.wes.ServerWebSocket; 025import io.github.flowersinthesand.wes.VoidAction; 026 027import java.io.IOException; 028import java.io.UnsupportedEncodingException; 029import java.net.URI; 030import java.net.URLDecoder; 031import java.nio.CharBuffer; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.LinkedHashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Timer; 040import java.util.TimerTask; 041import java.util.UUID; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.ConcurrentMap; 044import java.util.concurrent.CopyOnWriteArraySet; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicReference; 047import java.util.regex.Matcher; 048import java.util.regex.Pattern; 049 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import com.fasterxml.jackson.core.JsonProcessingException; 054import com.fasterxml.jackson.core.type.TypeReference; 055import com.fasterxml.jackson.databind.ObjectMapper; 056 057/** 058 * Default implementation of {@link Server}. 059 * <p> 060 * This implementation provides and manages {@link Socket} processing HTTP 061 * request and WebSocket following the portal protocol. 062 * <p> 063 * As options, the following methods can be overridden. 064 * <ul> 065 * <li>{@link DefaultServer#parseURI(String)} 066 * <li>{@link DefaultServer#parseEvent(String)} 067 * <li>{@link DefaultServer#stringifyEvent(Map)} 068 * </ul> 069 * 070 * @author Donghwan Kim 071 */ 072public class DefaultServer implements Server { 073 074 private final Logger log = LoggerFactory.getLogger(DefaultServer.class); 075 private ConcurrentMap<String, DefaultSocket> sockets = new ConcurrentHashMap<>(); 076 private Actions<Socket> socketActions = new ConcurrentActions<>(); 077 078 private Action<ServerHttpExchange> httpAction = new Action<ServerHttpExchange>() { 079 @Override 080 public void on(final ServerHttpExchange http) { 081 switch (http.method()) { 082 case "GET": 083 Map<String, String> params = parseURI(http.uri()); 084 setNocache(http); 085 setCors(http); 086 switch (params.get("when")) { 087 case "open": 088 switch (params.get("transport")) { 089 case "sse": 090 case "streamxhr": 091 case "streamxdr": 092 case "streamiframe": 093 socketActions.fire(new DefaultSocket(new StreamTransport(params, http))); 094 break; 095 case "longpollajax": 096 case "longpollxdr": 097 case "longpolljsonp": 098 socketActions.fire(new DefaultSocket(new LongpollTransport(params, http))); 099 break; 100 default: 101 log.error("Transport, {}, is not supported", params.get("transport")); 102 http.setStatus(HttpStatus.NOT_IMPLEMENTED).close(); 103 } 104 break; 105 case "poll": { 106 String id = params.get("id"); 107 DefaultSocket socket = sockets.get(id); 108 if (socket != null) { 109 Transport transport = socket.transport; 110 if (transport instanceof LongpollTransport) { 111 ((LongpollTransport) transport).refresh(http); 112 } else { 113 log.error("Non-long polling transport#{} sent poll request", id); 114 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close(); 115 } 116 } else { 117 log.error("Long polling transport#{} is not found in poll request", id); 118 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR).close(); 119 } 120 break; } 121 case "abort": { 122 String id = params.get("id"); 123 Socket socket = sockets.get(id); 124 if (socket != null) { 125 socket.close(); 126 } 127 http.setResponseHeader("content-type", "text/javascript; charset=utf-8").close(); 128 break; } 129 default: 130 log.error("when, {}, is not supported", params.get("when")); 131 http.setStatus(HttpStatus.NOT_IMPLEMENTED).close(); 132 break; 133 } 134 break; 135 case "POST": 136 setNocache(http); 137 setCors(http); 138 http.bodyAction(new Action<Data>() { 139 @Override 140 public void on(Data body) { 141 String data = body.as(String.class).substring("data=".length()); 142 String id = findSocketId(data); 143 144 DefaultSocket socket = sockets.get(id); 145 if (socket != null) { 146 Transport transport = socket.transport; 147 if (transport instanceof HttpTransport) { 148 transport.messageActions.fire(data); 149 } else { 150 log.error("Non-HTTP socket#{} receives a POST message", id); 151 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR); 152 } 153 } else { 154 log.error("A POST message arrived but no socket#{} is found", id); 155 http.setStatus(HttpStatus.INTERNAL_SERVER_ERROR); 156 } 157 http.close(); 158 }; 159 160 private String findSocketId(String text) { 161 Matcher matcher = Pattern.compile("\"socket\":\"([^\"]+)\"").matcher(text); 162 matcher.find(); 163 return matcher.group(1); 164 } 165 }); 166 break; 167 default: 168 log.error("HTTP method, {}, is not supported", http.method()); 169 http.setStatus(HttpStatus.METHOD_NOT_ALLOWED).close(); 170 break; 171 } 172 } 173 174 private void setNocache(ServerHttpExchange http) { 175 http 176 .setResponseHeader("cache-control", "no-cache, no-store, must-revalidate") 177 .setResponseHeader("pragma", "no-cache") 178 .setResponseHeader("expires", "0"); 179 } 180 181 private void setCors(ServerHttpExchange http) { 182 String origin = http.requestHeader("origin"); 183 String acrh = http.requestHeader("access-control-request-headers"); 184 http 185 .setResponseHeader("access-control-allow-origin", origin != null ? origin : "*") 186 .setResponseHeader("access-control-allow-credentials", "true"); 187 if (acrh != null) { 188 http.setResponseHeader("access-control-allow-headers", acrh); 189 } 190 } 191 }; 192 193 private Action<ServerWebSocket> websocketAction = new Action<ServerWebSocket>() { 194 @Override 195 public void on(ServerWebSocket ws) { 196 Map<String, String> params = parseURI(ws.uri()); 197 socketActions.fire(new DefaultSocket(new WebSocketTransport(params, ws))); 198 } 199 }; 200 201 /** 202 * Takes a portal URI and returns a map of parameters. 203 * <p> 204 * This is a counterpart of {@code urlBuilder} of client option. 205 */ 206 protected Map<String, String> parseURI(String uri) { 207 Map<String, String> map = new LinkedHashMap<>(); 208 String query = URI.create(uri).getQuery(); 209 if ((query == null) || (query.equals(""))) { 210 return map; 211 } 212 213 String[] params = query.split("&"); 214 for (String param : params) { 215 try { 216 String[] pair = param.split("=", 2); 217 String name = URLDecoder.decode(pair[0], "UTF-8"); 218 if (name == "") { 219 continue; 220 } 221 222 map.put(name, pair.length > 1 ? URLDecoder.decode(pair[1], "UTF-8") : ""); 223 } catch (UnsupportedEncodingException e) {} 224 } 225 226 return Collections.unmodifiableMap(map); 227 } 228 229 /** 230 * Takes a stringified event and returns an event object. 231 * <p> 232 * A text in argument is generated by {@code outbound} of client option and 233 * this is akin to {@code inbound} of client option. 234 */ 235 protected Map<String, Object> parseEvent(String text) { 236 try { 237 return new ObjectMapper().readValue(text, new TypeReference<Map<String, Object>>() {}); 238 } catch (IOException e) { 239 throw new RuntimeException(e); 240 } 241 } 242 243 /** 244 * Takes an event object and returns a stringified event. 245 * <p> 246 * This is akin to {@code outbound} of client option and a returned value will 247 * be handled by {@code inbound} of client option. 248 */ 249 protected String stringifyEvent(Map<String, Object> event) { 250 try { 251 return new ObjectMapper().writeValueAsString(event); 252 } catch (JsonProcessingException e) { 253 throw new RuntimeException(e); 254 } 255 } 256 257 @Override 258 public Server all(Action<Socket> action) { 259 for (Socket socket : sockets.values()) { 260 action.on(socket); 261 } 262 return this; 263 } 264 265 @Override 266 public Server byId(String id, Action<Socket> action) { 267 Socket socket = sockets.get(id); 268 if (socket != null) { 269 action.on(socket); 270 } 271 return this; 272 } 273 274 @Override 275 public Server byTag(String name, Action<Socket> action) { 276 return byTag(new String[] { name }, action); 277 } 278 279 @Override 280 public Server byTag(String[] names, Action<Socket> action) { 281 List<String> nameList = Arrays.asList(names); 282 for (Socket socket : sockets.values()) { 283 if (socket.tags().containsAll(nameList)) { 284 action.on(socket); 285 } 286 } 287 return this; 288 } 289 290 @Override 291 public Server socketAction(Action<Socket> action) { 292 socketActions.add(action); 293 return this; 294 } 295 296 @Override 297 public Action<ServerHttpExchange> httpAction() { 298 return httpAction; 299 } 300 301 @Override 302 public Action<ServerWebSocket> websocketAction() { 303 return websocketAction; 304 } 305 306 private abstract class Transport { 307 final Map<String, String> params; 308 Actions<String> messageActions = new ConcurrentActions<>(); 309 Actions<Void> closeActions = new ConcurrentActions<>(); 310 311 Transport(Map<String, String> params) { 312 this.params = params; 313 } 314 315 abstract String uri(); 316 abstract void send(String data); 317 abstract void close(); 318 } 319 320 private class WebSocketTransport extends Transport { 321 final ServerWebSocket ws; 322 323 WebSocketTransport(Map<String, String> params, ServerWebSocket ws) { 324 super(params); 325 this.ws = ws; 326 ws.closeAction(new VoidAction() { 327 @Override 328 public void on() { 329 closeActions.fire(); 330 } 331 }) 332 .messageAction(new Action<Data>() { 333 @Override 334 public void on(Data data) { 335 messageActions.fire(data.as(String.class)); 336 } 337 }); 338 } 339 340 @Override 341 String uri() { 342 return ws.uri(); 343 } 344 345 @Override 346 synchronized void send(String data) { 347 ws.send(data); 348 } 349 350 @Override 351 synchronized void close() { 352 ws.close(); 353 } 354 } 355 356 private abstract class HttpTransport extends Transport { 357 final ServerHttpExchange http; 358 359 HttpTransport(Map<String, String> params, ServerHttpExchange http) { 360 super(params); 361 this.http = http; 362 } 363 364 @Override 365 String uri() { 366 return http.uri(); 367 } 368 } 369 370 final static String text2KB = CharBuffer.allocate(2048).toString().replace('\0', ' '); 371 372 private class StreamTransport extends HttpTransport { 373 final boolean isAndroidLowerThan3; 374 375 StreamTransport(Map<String, String> params, ServerHttpExchange http) { 376 super(params, http); 377 String ua = http.requestHeader("user-agent"); 378 this.isAndroidLowerThan3 = ua == null ? false : ua.matches(".*Android\\s[23]\\..*"); 379 http.closeAction(new VoidAction() { 380 @Override 381 public void on() { 382 closeActions.fire(); 383 } 384 }) 385 .setResponseHeader("content-type", 386 "text/" + (params.get("transport").equals("sse") ? "event-stream" : "plain") + "; charset=utf-8") 387 .write((isAndroidLowerThan3 ? text2KB : "") + text2KB + "\n"); 388 } 389 390 @Override 391 synchronized void send(String data) { 392 String payload = (isAndroidLowerThan3 ? text2KB + text2KB : "") + ""; 393 for (String datum : data.split("\r\n|\r|\n")) { 394 payload += "data: " + datum + "\n"; 395 } 396 payload += "\n"; 397 http.write(payload); 398 } 399 400 @Override 401 synchronized void close() { 402 http.close(); 403 } 404 } 405 406 private class LongpollTransport extends HttpTransport { 407 AtomicReference<ServerHttpExchange> httpRef = new AtomicReference<>(); 408 AtomicBoolean closed = new AtomicBoolean(); 409 AtomicBoolean written = new AtomicBoolean(); 410 Set<String> buffer = new CopyOnWriteArraySet<>(); 411 AtomicReference<Timer> closeTimer = new AtomicReference<>(); 412 413 LongpollTransport(Map<String, String> params, ServerHttpExchange http) { 414 super(params, http); 415 refresh(http); 416 } 417 418 void refresh(ServerHttpExchange http) { 419 final Map<String, String> parameters = parseURI(http.uri()); 420 http.closeAction(new VoidAction() { 421 @Override 422 public void on() { 423 closed.set(true); 424 if (parameters.get("when").equals("poll") && !written.get()) { 425 closeActions.fire(); 426 } 427 Timer timer = new Timer(true); 428 timer.schedule(new TimerTask() { 429 @Override 430 public void run() { 431 closeActions.fire(); 432 } 433 }, 500); 434 closeTimer.set(timer); 435 } 436 }) 437 .setResponseHeader("content-type", 438 "text/" + (params.get("transport").equals("longpolljsonp") ? "javascript" : "plain") + "; charset=utf-8"); 439 440 if (parameters.get("when").equals("open")) { 441 http.close(); 442 } else { 443 httpRef.set(http); 444 closed.set(false); 445 written.set(false); 446 Timer timer = closeTimer.getAndSet(null); 447 if (timer != null) { 448 timer.cancel(); 449 } 450 if (parameters.containsKey("lastEventIds")) { 451 String[] lastEventIds = parameters.get("lastEventIds").split(","); 452 for (String eventId : lastEventIds) { 453 for (String message : buffer) { 454 if (eventId.equals(findEventId(message))) { 455 buffer.remove(message); 456 } 457 } 458 } 459 if (!buffer.isEmpty()) { 460 Iterator<String> iterator = buffer.iterator(); 461 String string = iterator.next(); 462 while (iterator.hasNext()) { 463 string += "," + iterator.next(); 464 } 465 send("[" + string + "]"); 466 } 467 } 468 } 469 } 470 471 private String findEventId(String text) { 472 Matcher matcher = Pattern.compile("\"id\":\"([^\"]+)\"").matcher(text); 473 matcher.find(); 474 return matcher.group(1); 475 } 476 477 @Override 478 synchronized void send(String data) { 479 if (!data.startsWith("[")) { 480 buffer.add(data); 481 } 482 ServerHttpExchange http = httpRef.getAndSet(null); 483 if (http != null && !closed.get()) { 484 written.set(true); 485 String payload; 486 try { 487 payload = params.get("transport").equals("longpolljsonp") ? 488 params.get("callback") + "(" + new ObjectMapper().writeValueAsString(data) + ");" : 489 data; 490 } catch (JsonProcessingException e) { 491 throw new RuntimeException(e); 492 } 493 http.close(payload); 494 } 495 } 496 497 @Override 498 synchronized void close() { 499 ServerHttpExchange http = httpRef.getAndSet(null); 500 if (http != null && !closed.get()) { 501 http.close(); 502 } 503 } 504 } 505 506 private class DefaultSocket implements Socket { 507 final Transport transport; 508 Set<String> tags = new CopyOnWriteArraySet<>(); 509 ConcurrentMap<String, Actions<Object>> actionsMap = new ConcurrentHashMap<>(); 510 ConcurrentMap<String, Action<Object>> replyMap = new ConcurrentHashMap<>(); 511 512 DefaultSocket(final Transport transport) { 513 this.transport = transport; 514 transport.closeActions.add(new VoidAction() { 515 @Override 516 public void on() { 517 sockets.remove(transport.params.get("id")); 518 Actions<Object> closeActions = actionsMap.get("close"); 519 if (closeActions != null) { 520 closeActions.fire(); 521 } 522 } 523 }); 524 transport.messageActions.add(new Action<String>() { 525 @Override 526 public void on(String text) { 527 final Map<String, Object> event = parseEvent(text); 528 Actions<Object> actions = actionsMap.get(event.get("type")); 529 if (actions != null) { 530 if ((Boolean) event.get("reply")) { 531 actions.fire(new Reply<Object>() { 532 @Override 533 public Object data() { 534 return event.get("data"); 535 } 536 537 @Override 538 public void done() { 539 done(null); 540 } 541 542 @Override 543 public void done(Object value) { 544 sendReply(value, false); 545 } 546 547 @Override 548 public void fail() { 549 fail(null); 550 } 551 552 @Override 553 public void fail(Object value) { 554 sendReply(value, true); 555 } 556 557 AtomicBoolean sent = new AtomicBoolean(); 558 private void sendReply(Object value, boolean exception) { 559 if (sent.compareAndSet(false, true)) { 560 Map<String, Object> result = new LinkedHashMap<String, Object>(); 561 result.put("id", event.get("id")); 562 result.put("data", value); 563 result.put("exception", exception); 564 send("reply", result); 565 } 566 } 567 }); 568 } else { 569 actions.fire(event.get("data")); 570 } 571 } 572 } 573 }); 574 on("reply", new Action<Map<String, Object>>() { 575 @Override 576 public void on(Map<String, Object> info) { 577 Action<Object> reply = replyMap.remove(info.get("id")); 578 if (reply != null) { 579 reply.on(info.get("data")); 580 } else { 581 log.error("Reply callback not found in socket#{} with info, {}", id(), info); 582 } 583 } 584 }); 585 try { 586 new HeartbeatHelper(Long.valueOf(transport.params.get("heartbeat"))); 587 } catch (NumberFormatException e) {} 588 589 sockets.put(id(), this); 590 } 591 592 class HeartbeatHelper { 593 final long delay; 594 AtomicReference<Timer> timer = new AtomicReference<>(); 595 596 HeartbeatHelper(long delay) { 597 this.delay = delay; 598 timer.set(createTimer()); 599 on("heartbeat", new VoidAction() { 600 @Override 601 public void on() { 602 timer.getAndSet(createTimer()).cancel(); 603 send("heartbeat"); 604 } 605 }); 606 } 607 608 Timer createTimer() { 609 Timer timer = new Timer(true); 610 timer.schedule(new TimerTask() { 611 @Override 612 public void run() { 613 close(); 614 } 615 }, delay); 616 return timer; 617 } 618 } 619 620 @Override 621 public String id() { 622 return transport.params.get("id"); 623 } 624 625 @Override 626 public String uri() { 627 return transport.uri(); 628 } 629 630 @Override 631 public Set<String> tags() { 632 return tags; 633 } 634 635 @SuppressWarnings("unchecked") 636 @Override 637 public <T> Socket on(String event, Action<T> action) { 638 Actions<Object> actions = actionsMap.get(event); 639 if (actions == null) { 640 Actions<Object> value = new ConcurrentActions<>(); 641 actions = actionsMap.putIfAbsent(event, value); 642 if (actions == null) { 643 actions = value; 644 } 645 } 646 actions.add((Action<Object>) action); 647 return this; 648 } 649 650 @SuppressWarnings("unchecked") 651 @Override 652 public <T> Socket off(String event, Action<T> action) { 653 Actions<Object> actions = actionsMap.get(event); 654 if (actions != null) { 655 actions.remove((Action<Object>) action); 656 } 657 return this; 658 } 659 660 @Override 661 public Socket send(String event) { 662 return send(event, null); 663 } 664 665 @Override 666 public Socket send(String event, Object data) { 667 return send(event, data, null); 668 } 669 670 @SuppressWarnings("unchecked") 671 @Override 672 public <T> Socket send(String type, Object data, Action<T> reply) { 673 String eventId = UUID.randomUUID().toString(); 674 Map<String, Object> event = new LinkedHashMap<String, Object>(); 675 676 event.put("id", eventId); 677 event.put("type", type); 678 event.put("data", data); 679 event.put("reply", reply != null); 680 681 String text = stringifyEvent(event); 682 transport.send(text); 683 if (reply != null) { 684 replyMap.put(eventId, (Action<Object>) reply); 685 } 686 return this; 687 } 688 689 @Override 690 public Socket close() { 691 transport.close(); 692 return this; 693 } 694 695 @Override 696 public int hashCode() { 697 final int prime = 31; 698 int result = 1; 699 result = prime * result + ((id() == null) ? 0 : id().hashCode()); 700 return result; 701 } 702 703 @Override 704 public boolean equals(Object obj) { 705 if (this == obj) 706 return true; 707 if (obj == null) 708 return false; 709 if (getClass() != obj.getClass()) 710 return false; 711 DefaultSocket other = (DefaultSocket) obj; 712 if (id() == null) { 713 if (other.id() != null) 714 return false; 715 } else if (!id().equals(other.id())) 716 return false; 717 return true; 718 } 719 } 720 721}