Case Study: Intermodal Delivery Management System
Table of Contents
1. Overview
INMODS is a simple application system that demonstrates the current state-of-the-art development practices.
User Story:
Users can enter delivery orders with start and end locations and track their states in real-time on a map.
The system is based on a microservice architecture with services that are implemented either in Java/Spring or Python. It can be run locally on a desktop computer or as docker containers as well as on Kubernetes cloud platforms.
1.1. INMODS Services
- Order Processing
- creates, modifies and tracks delivery orders
- Fleet Manager
- allocates vehicles and assigns delivery jobs
- Route Planner
- finds the shortest route between start and end points of deliveries
- Delivery Tracking
- displays real-time locations of the vehicles on a map.
- Vehicle
- a means of transporting items (bicycle, car, truck, train, ship, airplane) which sends GPS location events in regular intervals
- Kafka
- a distributed event streaming platform used to persist movement events of vehicles
1.2. Development Prerequisites
- Java JDK >=17
- Python >=3.12
- IDE (Eclipse, IntelliJ, VSCode/Studio..)
- Docker
- Kubernetes (Minikube, K3d ..)
2. Route Planner
The route planner finds the shortest route between start and end points of deliveries. A route is a collection of links between 2 cities. Each city has a (unique) name and a location described by geo coordinates (latitude, longitude) .
The route-planner is a Java Spring-Boot project with Maven.
Go to https://start.spring.io and enter the shown values:
Figure 1: INMODS Architecture
with the dependencies: DevTools, Spring Web, Spring Data Redis
- Activate "GENERATE", download the ZIP-File
route-planner.zip
und unpack this file into the project directory. Initialize the git repository and commit the created directory.
git init git add route-planner git commit -m "initial commit"
- Open your Java IDE (Intellij, Eclipse, VisualCode, Netbeans) and import route-planner files as a Maven project.
- Create the package
org.inmods.routeplanner.model
Create the class City with its getter and setter methods:
package org.inmods.routeplanner.model; import org.springframework.data.annotation.Id; import org.springframework.data.geo.Point; import org.springframework.data.redis.core.RedisHash; import org.springframework.data.redis.core.index.GeoIndexed; import org.springframework.data.redis.core.index.Indexed; @RedisHash(value = "city") public class City { @Id String id; @Indexed String name; Integer population; @GeoIndexed Point location; String country; }
Add equal and hash methods:
public boolean equals(Object obj){ if(this == obj) return true; if((obj == null) || (obj.getClass() != this.getClass())) return false; City c = (City)obj; if( this.id != null) { return this.id.equals(c.id); } return this.name.equals(c.name); } public int hashCode(){ int hash = 7; hash = 31 * hash + id.hashCode(); return hash; }
Create the Class Link with its setter and getter methods:
package org.inmods.routeplanner.model; import java.util.List; import org.springframework.data.redis.core.RedisHash; /** * represents a connection between 2 cities */ @RedisHash(value = "link") public class Link { public enum TransportMode { TRAIN, PLANE, BICYCLE, TRUCK, SHIP; } private String id; private String from; private String to; private Double weight; private TransportMode transportMode; }
- Add package
org.inmods.routeplanner.repository
Create the interface
CityRepository
(Point, Distance are geo data):package org.inmods.routeplanner.repository; public interface CityRepository extends CrudRepository<City, String> { List<City> findByName(String name); List<City> findByLocationNear(Point point, Distance distance); }
Create the interface LinkRepository:
package org.inmods.routeplanner.repository; public interface LinkRepository extends CrudRepository<Link, String> { }
Create the class DatabaseLoader
package org.inmods.routeplanner; import java.io.InputStream; import java.util.Scanner; import org.inmods.routeplanner.model.City; import org.inmods.routeplanner.repository.CityRepository; import org.inmods.routeplanner.repository.LinkRepository; import org.inmods.routeplanner.model.Link; import org.inmods.routeplanner.model.Link.TransportMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Profile; import org.springframework.data.geo.Point; import org.springframework.stereotype.Component; @Component @Profile("!mocktest") public class DatabaseLoader implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(DatabaseLoader.class); @Autowired private CityRepository cityRepository; @Autowired private LinkRepository linkRepository; @Value("${routeplanner.cities}") String cityfile; @Value("${routeplanner.links}") String linkfile; @Override public void run(String... strings) { long numCities = cityRepository.count(); long numLinks = linkRepository.count(); if (numCities == 0) { numCities = loadCities(); numLinks = loadLinks(); } logger.info("Total {} cities {} links", numCities, numLinks); } private int loadCities() { int nrecs = 0; InputStream in = this.getClass().getResourceAsStream(cityfile); Scanner sc = new Scanner(in, "UTF-8"); while (sc.hasNextLine()) { String[] recs = sc.nextLine().split("\t"); City c = new City(); c.setName(recs[0].strip()); c.setCountry(recs[1].strip()); c.setPopulation(Integer.valueOf(recs[2])); c.setLocation(new Point(Double.valueOf(recs[4]), Double.valueOf(recs[3]))); cityRepository.save(c); nrecs++; } return nrecs; } private int loadLinks() { int nrecs = 0; InputStream in = this.getClass().getResourceAsStream(linkfile); Scanner sc = new Scanner(in, "UTF-8"); while (sc.hasNextLine()) { String[] recs = sc.nextLine().split("\t"); if (recs.length < 2) { logger.warn("illegal record at line {}", nrecs); } else { try { City a = cityRepository.findByName(recs[0].strip()).get(0); City b = cityRepository.findByName(recs[1].strip()).get(0); Double dist = a.distance(b); Link l = new Link(a.getName(), b.getName(), dist, TransportMode.TRAIN); linkRepository.save(l); nrecs++; } catch (IndexOutOfBoundsException e) { logger.warn("City not found in {} {}", recs[0], recs[1]); } } } return nrecs; } }
add the Constructors to the Link class:
public Link(){} public Link(String from, String to, Double dist, TransportMode tmode) { this.from = from; this.to = to; this.weight = dist; this.transportMode = tmode; }
Create the method distance in City class:
public double distance(City to){ if( this.location.getY() == to.location.getY() && this.location.getX() == to.location.getX() ) return 0.0; // convert coordinates into radian double k = Math.PI/180; double a = this.location.getY()*k; // Latitude double b = this.location.getX()*k; // Longitude double c = to.location.getY()*k; // Latitude double d = to.location.getX()*k; // Longitude double x = Math.sin(a)*Math.sin(c)+ Math.cos(a)*Math.cos(c)*Math.cos(b-d); double radius = 6371.007176; // in km if (x > 1) x = 1.0; return radius*Math.acos(x); }
- Add the file cities.txt and links.txt to the resources directory.
add the lines to the application.properties:
routeplanner.cities=/cities.txt routeplanner.links=/links.txt
- Start redis:
docker run --rm -p 6379:6379 redis:7
- Start RoutePlannerApplication
- Add the package org.inmods.routeplanner.service
Create the class RouteService
add the method isIncludedIn to the class Link:
/** * check if both ends of this link are included in the array of cities * * @param cities * @return true if this link is included in cities */ public boolean isIncludedIn(List<City> cities) { for (City c : cities) { if (c.getName().equals(from) || c.getName().equals(to)) { return true; } } return false; }
create the package error and the class CityNotFoundException with following content:
package org.inmods.routeplanner.error; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ResponseStatus; @ResponseStatus(value= HttpStatus.NOT_FOUND, reason="No such City") public class CityNotFoundException extends RuntimeException{ public CityNotFoundException(String name){ super("City \"" + name + "\""); } }
- Add package org.inmods.routeplanner.controller
Create class RouteController
package org.inmods.routeplanner.controller; @RestController public class RouteController { @Autowired RouteService routeService; @GetMapping("/route/{from}/{to}") public List<Link> findShortestRoute(@PathVariable String from, @PathVariable String to){ return routeService.findShortestPath(from, to); } @GetMapping("/city/{name}") public City findCity(@PathVariable String name){ return routeService.findCity(name); } @GetMapping("/cities/") public Iterable<String> getCities(@PathVariable String country){ return routeService.findCities(country); } @GetMapping("/countries") public Iterable<String> getCountries(){ return routeService.findCountries(); } }
3. Fleet Manager
The Fleet Manager allocates vehicles and assigns delivery jobs. This is a python project with flask.
create the module directories in the project directory
mkdir fleet-manager/ mkdir fleet-manager/manager mkdir fleet-manager/etc mkdir fleet-manager/tests
Add the file
requirements.txt
with following content to the fleet-manager directorypyproj pyzmq flask pyaml six kafka-python-ng
Create a new python virtual environment
inmods-venv
, activate it and install the requirements:python -m venv inmods-venv source inmods-venv/bin/activate pip install -r fleet-manager/requirements.txt
create the config file
config.yaml
with following content in the directory fleet-manager/etc:route_planner: host: 'route-planner' port: 8080 event_streaming: host: 'kafka' port: 9092 vehicles: - city: Basel speed: 20 - city: Aarau speed: 16
Add the file
manager.py
to the directory fleet-manager/manager:import logging import pyproj # generic coordinate transformation of geospatial coordinates from .vehicle import Vehicle import time import zmq import uuid import threading geod = pyproj.Geod(ellps='WGS84') event_url = "inproc://events" context = zmq.Context().instance() def get_waypts(a, b): """return km spaced list of (lon/lat) pairs between a and b Args: a: start location b: end location""" dist = geod.inv(*a, *b)[-1] return [a] + geod.npts(*a, *b, 1+int(dist/1000)) class FleetManager(object): def __init__(self, route_planner, kafka_host='localhost'): self.route_planner = route_planner self.event_receiver = context.socket(zmq.PULL) self.event_receiver.bind(event_url) self.vehicles = dict() self.kafka_host = kafka_host def add_vehicle(self, speed, city): loc = self.route_planner.get_locations([city])[0] v = Vehicle(str(uuid.uuid4())[:8], speed, loc, event_url, self.kafka_host) self.vehicles[v.id] = dict(vehicle=v, city=city) return v.id def set_job(self, fromcity, tocity): """assigns delivery job to vehicle closest to fromcity""" cand = [] for k in self.vehicles: if not self.vehicles[k]['vehicle'].is_running: if self.vehicles[k]['city'] == fromcity: cand = [(self.vehicles[k], dict(itinerary=[], total=0))] break pickup = self.route_planner.get_route( self.vehicles[k]['city'], fromcity) if pickup: cand.append((self.vehicles[k], pickup)) try: v, pickup = sorted(cand, key=lambda k: k[1]['total'])[0] r = self.route_planner.get_route(fromcity, tocity) ity = self.route_planner.get_locations(pickup['itinerary'] + r['itinerary']) logging.info(" itinerary %s", ity) wp = [x for y in zip(ity, ity[1:]) for x in get_waypts(y[0], y[1])] v['vehicle'].start(wp + [ity[-1]]) v['city']=tocity return dict(vehicle=v['vehicle'].id, dist=r['total'], pickup=pickup['total']) except Exception: logging.exception("no route found") return dict() def start(self): self.thread = threading.Thread(target=self.handle_events, daemon=True) self.__is_active = True self.thread.start() def stop(self): """stop all running vehicles""" for k in self.vehicles.keys(): self.vehicles[k]['vehicle'].stop() def handle_events(self): while self.__is_active: msg = self.event_receiver.recv() logging.info(msg) # clean up self.stop() self.event_receiver.close() context.term() def ready(self): return "Yes" def join(self): try: self.thread.join() except KeyboardInterrupt: logging.info("Terminate") self.__is_active = False self.thread.join() if __name__ == '__main__': import random from .routeplanner import RoutePlanner logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') route_planner = RoutePlanner('localhost', 8080) s = FleetManager(route_planner) v = [s.add_vehicle( random.randrange(5, 30), 'Basel'), s.add_vehicle( random.randrange(5, 30), 'Aarau')] s.start() logging.info("Ready") time.sleep(4) # vid, d = s.set_job('Basel', 'Aarau') logging.info("Assigned %s total dist %f", vid, d) time.sleep(1) vid, d = s.set_job('Basel', 'Aarau') logging.info("Assigned %s total dist %d", vid, d) s.join()
Add the file
vehicle.py
to the directory fleet-manager/managerimport time import logging import threading import json import zmq # this is a python 3.12 hack with kafka: # No module named 'kafka.vendor.six.moves' import six import sys if sys.version_info >= (3, 12, 0): sys.modules['kafka.vendor.six.moves'] = six.moves import kafka context = zmq.Context().instance() class Vehicle(object): def __init__(self, id, speed, location, sink_url, event_streamer={}): self.id = id self.location = location self.speed = speed self.departure = 0 self.scale_factor = 25 self.sink_url = sink_url self.sender = context.socket(zmq.PUSH) self.sender.connect(self.sink_url) self.is_running = False try: host = event_streamer.get('host') or 'localhost' port = event_streamer.get('port') or 9092 servers = f'{host}:{port}' logging.info(f'bootstrap_servers={servers}') self.producer = kafka.KafkaProducer( bootstrap_servers=f'{servers}', api_version=(0, 10, 1), value_serializer=lambda v: json.dumps(v).encode()) self.topic = 'vehicle-positions' logging.info(f'bootstrap_servers={servers}') except: logging.exception("no event streaming") self.topic = False def start(self, itinerary): """set itinerary (list of ordered waypoints) and start voyage""" self.thread = threading.Thread(target=self.run, args=(itinerary,), daemon=True) self.is_running = True self.thread.start() def stop(self): self.is_running = False self.sender.close() def run(self, itinerary): """calculates distance and moves to new position returns if end of itinerary is reached """ s = 0 self.departure = time.time() time_step = 1000/self.speed/self.scale_factor while self.is_running: try: dt = time.time() - self.departure s = self.speed * dt * self.scale_factor self.location = itinerary[int(round(s/1000))] msg = dict(location=self.location, id=self.id) if self.topic: self.producer.send(self.topic, msg) self.sender.send_json(msg) time.sleep(time_step) except IndexError: self.is_running = False logging.info("Vehicle {0} stopped: pos {1} dist {2:5.1f}".format( self.id, self.location, s/1000)) self.departure = 0 self.sender.send_string("{}".format(self.id)) def join(self): self.thread.join()
Add the file
routeplanner.py
to the directory fleet-manager/manager:import logging import urllib.parse import urllib.request import json class RoutePlanner(object): def __init__(self, host, port): self.url = 'http://{}:{}'.format(host, port) def get_route(self, fromcity, tocity): """returns itinerary and total distance for route fromcity to tocity""" resp = urllib.request.urlopen( self.url + urllib.parse.quote(f'/route/{fromcity}/{tocity}')) if resp.getcode() == 200: r = json.loads(resp.read().decode()) try: return dict(itinerary=[i['from'] for i in r] +[r[-1]['to']], total=sum([l['weight'] for l in r])) except: pass return dict() # no route found def get_locations(self, itinerary): """returns list of (lon,lat) pairs of cities in itinerary""" loc = [] for c in itinerary: resp = urllib.request.urlopen(self.url+urllib.parse.quote(f'/city/{c}')) if resp.getcode() == 200: r = json.loads(resp.read().decode()) loc.append((r['location']['x'], r['location']['y'])) else: logging.warn(resp) return loc def get_cities(self): """returns a list of countries and cities""" d = dict() resp = urllib.request.urlopen(self.url+'/countries') if resp.getcode() == 200: for c in json.loads(resp.read().decode()): country = urllib.parse.quote(c) resp = urllib.request.urlopen( self.url+urllib.parse.quote(f'/cities/{country}')) if resp.getcode() == 200: d[c] = json.loads(resp.read().decode()) return d
Add the file
__init__.py
to the directory fleet-manager/manager:from flask import Flask, jsonify, request, abort import os import json import yaml import logging from pathlib import Path from .manager import FleetManager from .routeplanner import RoutePlanner logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') def create_app(test_config=None): # default location of config file. config-localhost.yaml is used if the environment variable isn’t set config_file = Path(os.environ.get('CONFIG_FILE', 'etc/config-localhost.yaml')) # verify file exists before attempting to read and extend the configuration if config_file.is_file(): config = yaml.load(config_file.read_text(), Loader=yaml.FullLoader) else: logging.warning("config_file %s not found", config_file) config = dict() route_planner = config.get('route_planner', dict(host='localhost', port=8080)) kafka_host = config.get('event_streaming', '') routePlanner = RoutePlanner(route_planner['host'], route_planner['port']) fleetManager = FleetManager(routePlanner, kafka_host) app = Flask(__name__) #@app.before_first_request def activate(): for v in config['vehicles']: id = fleetManager.add_vehicle(v['speed'], v['city']) app.logger.info('Vehicle %s added at %s', id, v['city']) fleetManager.start() @app.route('/cities') def cities(): return jsonify(routePlanner.get_cities()) @app.route('/job', methods = ['POST']) def job(): rec = json.loads(request.data) app.logger.info(rec) try: job = fleetManager.set_job( rec['fromCity'], rec['toCity']) if job: return jsonify(job) except: app.logger.exception("no route found") return "No route found", 400 @app.route('/alive') def alive(): return "Yes" @app.route('/ready') def ready(): if fleetManager.ready(): return "Yes" else: abort(500) with app.app_context(): activate() return app if __name__ == '__main__': app = create_app()
create the file
config-localhost.yaml
with following content in the directory fleet-manager:route_planner: host: 'localhost' port: 8080 vehicles: - city: Basel speed: 20 - city: Aarau speed: 16
Start fleet-manager
flask --app manager run