Case Study: Intermodal Delivery Management System

Table of Contents

1. Overview

INMODS is a simple application system that demonstrates the current state-of-the-art development practices.

User Story:

Users can enter delivery orders with start and end locations and track their states in real-time on a map.

The system is based on a microservice architecture with services that are implemented either in Java/Spring or Python. It can be run locally on a desktop computer or as docker containers as well as on Kubernetes cloud platforms.

inmods-arch.png

1.1. INMODS Services

Order Processing
creates, modifies and tracks delivery orders
Fleet Manager
allocates vehicles and assigns delivery jobs
Route Planner
finds the shortest route between start and end points of deliveries
Delivery Tracking
displays real-time locations of the vehicles on a map.
Vehicle
a means of transporting items (bicycle, car, truck, train, ship, airplane) which sends GPS location events in regular intervals
Kafka
a distributed event streaming platform used to persist movement events of vehicles

1.2. Development Prerequisites

  • Java JDK >=17
  • Python >=3.12
  • IDE (Eclipse, IntelliJ, VSCode/Studio..)
  • Docker
  • Kubernetes (Minikube, K3d ..)

2. Route Planner

The route planner finds the shortest route between start and end points of deliveries. A route is a collection of links between 2 cities. Each city has a (unique) name and a location described by geo coordinates (latitude, longitude) .

The route-planner is a Java Spring-Boot project with Maven.

  1. Go to https://start.spring.io and enter the shown values:

    route-planner-spring-starter.png

    Figure 1: INMODS Architecture

    with the dependencies: DevTools, Spring Web, Spring Data Redis

  2. Activate "GENERATE", download the ZIP-File route-planner.zip und unpack this file into the project directory.
  3. Initialize the git repository and commit the created directory.

    git init
    git add route-planner
    git commit -m "initial commit"
    
  4. Open your Java IDE (Intellij, Eclipse, VisualCode, Netbeans) and import route-planner files as a Maven project.
  5. Create the package org.inmods.routeplanner.model
  6. Create the class City with its getter and setter methods:

    package org.inmods.routeplanner.model;
    
    import org.springframework.data.annotation.Id;
    import org.springframework.data.geo.Point;
    import org.springframework.data.redis.core.RedisHash;
    import org.springframework.data.redis.core.index.GeoIndexed;
    import org.springframework.data.redis.core.index.Indexed;
    
    @RedisHash(value = "city")
    public class City {
        @Id
        String id;
        @Indexed
        String name;
        Integer population;
        @GeoIndexed
        Point location;
        String country;
    }
    
  7. Add equal and hash methods:

    public boolean equals(Object obj){
        if(this == obj)
            return true;
        if((obj == null) || (obj.getClass() != this.getClass()))
            return false;
        City c = (City)obj;
        if( this.id != null) {
            return this.id.equals(c.id);
        }
        return this.name.equals(c.name);
    }
    
    public int hashCode(){
        int hash = 7;
        hash = 31 * hash + id.hashCode();
        return hash;
    }
    
  8. Create the Class Link with its setter and getter methods:

    package org.inmods.routeplanner.model;
    
    import java.util.List;
    import org.springframework.data.redis.core.RedisHash;
    
    /**
     * represents a connection between 2 cities
     */
    @RedisHash(value = "link")
    public class Link {
      public enum TransportMode {
        TRAIN, PLANE, BICYCLE, TRUCK, SHIP;
      }
    
      private String id;
      private String from;
      private String to;
      private Double weight;
      private TransportMode transportMode;
    }
    
  9. Add package org.inmods.routeplanner.repository
  10. Create the interface CityRepository (Point, Distance are geo data):

    package org.inmods.routeplanner.repository;
    public interface CityRepository extends CrudRepository<City, String> {
        List<City> findByName(String name);
        List<City> findByLocationNear(Point point, Distance distance);
    }
    
  11. Create the interface LinkRepository:

    package org.inmods.routeplanner.repository;
    public interface LinkRepository extends CrudRepository<Link, String> { }
    
  12. Create the class DatabaseLoader

    package org.inmods.routeplanner;
    
    import java.io.InputStream;
    import java.util.Scanner;
    import org.inmods.routeplanner.model.City;
    import org.inmods.routeplanner.repository.CityRepository;
    import org.inmods.routeplanner.repository.LinkRepository;
    import org.inmods.routeplanner.model.Link;
    import org.inmods.routeplanner.model.Link.TransportMode;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.context.annotation.Profile;
    import org.springframework.data.geo.Point;
    import org.springframework.stereotype.Component;
    
    @Component
    @Profile("!mocktest")
    public class DatabaseLoader implements CommandLineRunner {
    
      Logger logger = LoggerFactory.getLogger(DatabaseLoader.class);
    
      @Autowired
      private CityRepository cityRepository;
      @Autowired
      private LinkRepository linkRepository;
    
      @Value("${routeplanner.cities}")
      String cityfile;
      @Value("${routeplanner.links}")
      String linkfile;
    
      @Override
      public void run(String... strings) {
        long numCities = cityRepository.count();
        long numLinks = linkRepository.count();
        if (numCities == 0) {
          numCities = loadCities();
          numLinks = loadLinks();
        }
        logger.info("Total {} cities {} links", numCities, numLinks);
      }
    
      private int loadCities() {
        int nrecs = 0;
        InputStream in = this.getClass().getResourceAsStream(cityfile);
        Scanner sc = new Scanner(in, "UTF-8");
        while (sc.hasNextLine()) {
          String[] recs = sc.nextLine().split("\t");
          City c = new City();
          c.setName(recs[0].strip());
          c.setCountry(recs[1].strip());
          c.setPopulation(Integer.valueOf(recs[2]));
          c.setLocation(new Point(Double.valueOf(recs[4]), Double.valueOf(recs[3])));
          cityRepository.save(c);
          nrecs++;
        }
        return nrecs;
      }
    
      private int loadLinks() {
        int nrecs = 0;
        InputStream in = this.getClass().getResourceAsStream(linkfile);
        Scanner sc = new Scanner(in, "UTF-8");
        while (sc.hasNextLine()) {
          String[] recs = sc.nextLine().split("\t");
          if (recs.length < 2) {
            logger.warn("illegal record at line {}", nrecs);
          } else {
            try {
              City a = cityRepository.findByName(recs[0].strip()).get(0);
              City b = cityRepository.findByName(recs[1].strip()).get(0);
              Double dist = a.distance(b);
              Link l = new Link(a.getName(), b.getName(), dist, TransportMode.TRAIN);
              linkRepository.save(l);
              nrecs++;
            } catch (IndexOutOfBoundsException e) {
              logger.warn("City not found in {} {}", recs[0], recs[1]);
            }
          }
        }
        return nrecs;
      }
    
    }
    
  13. add the Constructors to the Link class:

    public Link(){}
    
    public Link(String from, String to,
        Double dist, TransportMode tmode) {
      this.from = from;
      this.to = to;
      this.weight = dist;
      this.transportMode = tmode;
    }
    
  14. Create the method distance in City class:

    public double distance(City to){
      if( this.location.getY() == to.location.getY() &&
          this.location.getX() == to.location.getX() )
        return 0.0;
      // convert coordinates into radian
      double k = Math.PI/180;
      double a = this.location.getY()*k; // Latitude
      double b = this.location.getX()*k; // Longitude
      double c = to.location.getY()*k;   // Latitude
      double d = to.location.getX()*k;   // Longitude
    
      double x = Math.sin(a)*Math.sin(c)+
         Math.cos(a)*Math.cos(c)*Math.cos(b-d);
      double radius = 6371.007176; // in km
      if (x > 1) x = 1.0;
      return radius*Math.acos(x);
    }
    
  15. Add the file cities.txt and links.txt to the resources directory.
  16. add the lines to the application.properties:

    routeplanner.cities=/cities.txt
    routeplanner.links=/links.txt
    
  17. Start redis: docker run --rm -p 6379:6379 redis:7
  18. Start RoutePlannerApplication
  19. Add the package org.inmods.routeplanner.service
  20. Create the class RouteService

    
    
  21. add the method isIncludedIn to the class Link:

    /**
     * check if both ends of this link are included in the array of cities
     *
     * @param cities
     * @return true if this link is included in cities
     */
    public boolean isIncludedIn(List<City> cities) {
      for (City c : cities) {
        if (c.getName().equals(from) || c.getName().equals(to)) {
          return true;
        }
      }
      return false;
    }
    
  22. create the package error and the class CityNotFoundException with following content:

    package org.inmods.routeplanner.error;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.web.bind.annotation.ResponseStatus;
    
    @ResponseStatus(value= HttpStatus.NOT_FOUND, reason="No such City")
    public class CityNotFoundException extends RuntimeException{
        public CityNotFoundException(String name){
        super("City \"" + name + "\"");
      }
    }
    
  23. Add package org.inmods.routeplanner.controller
  24. Create class RouteController

    package org.inmods.routeplanner.controller;
    @RestController
    public class RouteController {
    
      @Autowired
      RouteService routeService;
    
      @GetMapping("/route/{from}/{to}")
      public List<Link> findShortestRoute(@PathVariable String from, @PathVariable String to){
        return routeService.findShortestPath(from, to);
      }
    
      @GetMapping("/city/{name}")
      public City findCity(@PathVariable String name){
        return routeService.findCity(name);
      }
    
      @GetMapping("/cities/")
      public Iterable<String> getCities(@PathVariable String country){
        return routeService.findCities(country);
      }
    
      @GetMapping("/countries")
      public Iterable<String> getCountries(){
        return routeService.findCountries();
      }
    }
    

3. Fleet Manager

The Fleet Manager allocates vehicles and assigns delivery jobs. This is a python project with flask.

  1. create the module directories in the project directory

    mkdir fleet-manager/
    
    mkdir  fleet-manager/manager
    mkdir  fleet-manager/etc
    mkdir fleet-manager/tests
    
  2. Add the file requirements.txt with following content to the fleet-manager directory

    pyproj
    pyzmq
    flask
    pyaml
    six
    kafka-python-ng
    
  3. Create a new python virtual environment inmods-venv, activate it and install the requirements:

    python -m venv inmods-venv
    source inmods-venv/bin/activate
    pip install -r fleet-manager/requirements.txt
    
  4. create the config file config.yaml with following content in the directory fleet-manager/etc:

    route_planner:
      host: 'route-planner'
      port: 8080
    event_streaming:
      host: 'kafka'
      port: 9092
    vehicles:
      - city: Basel
        speed: 20
      - city: Aarau
        speed: 16
    
  5. Add the file manager.py to the directory fleet-manager/manager:

    import logging
    import pyproj   #  generic coordinate transformation of geospatial coordinates
    from .vehicle import Vehicle
    import time
    import zmq
    import uuid
    import threading
    
    geod = pyproj.Geod(ellps='WGS84')
    event_url = "inproc://events"
    context = zmq.Context().instance()
    
    
    def get_waypts(a, b):
        """return km spaced list of (lon/lat) pairs between a and b
        Args:
          a: start location
          b: end location"""
        dist = geod.inv(*a, *b)[-1]
        return [a] + geod.npts(*a, *b, 1+int(dist/1000))
    
    
    class FleetManager(object):
    
        def __init__(self, route_planner,
                     kafka_host='localhost'):
            self.route_planner = route_planner
            self.event_receiver = context.socket(zmq.PULL)
            self.event_receiver.bind(event_url)
    
            self.vehicles = dict()
            self.kafka_host = kafka_host
    
        def add_vehicle(self, speed, city):
            loc = self.route_planner.get_locations([city])[0]
            v = Vehicle(str(uuid.uuid4())[:8],
                        speed,
                        loc,
                        event_url, self.kafka_host)
            self.vehicles[v.id] = dict(vehicle=v, city=city)
            return v.id
    
        def set_job(self, fromcity, tocity):
            """assigns delivery job to vehicle closest to fromcity"""
            cand = []
            for k in self.vehicles:
                if not self.vehicles[k]['vehicle'].is_running:
                    if self.vehicles[k]['city'] == fromcity:
                        cand = [(self.vehicles[k],
                                 dict(itinerary=[], total=0))]
                        break
                    pickup = self.route_planner.get_route(
                        self.vehicles[k]['city'], fromcity)
                    if pickup:
                        cand.append((self.vehicles[k], pickup))
    
            try:
                v, pickup = sorted(cand, key=lambda k: k[1]['total'])[0]
                r = self.route_planner.get_route(fromcity, tocity)
                ity = self.route_planner.get_locations(pickup['itinerary'] + r['itinerary'])
                logging.info("  itinerary %s", ity)
                wp = [x for y in zip(ity, ity[1:])
                      for x in get_waypts(y[0], y[1])]
    
                v['vehicle'].start(wp + [ity[-1]])
    
                v['city']=tocity
                return dict(vehicle=v['vehicle'].id,
                            dist=r['total'], pickup=pickup['total'])
            except Exception:
                logging.exception("no route found")
            return dict()
    
        def start(self):
            self.thread = threading.Thread(target=self.handle_events,
                                           daemon=True)
            self.__is_active = True
            self.thread.start()
    
        def stop(self):
            """stop all running vehicles"""
            for k in self.vehicles.keys():
                self.vehicles[k]['vehicle'].stop()
    
        def handle_events(self):
            while self.__is_active:
                msg = self.event_receiver.recv()
                logging.info(msg)
            # clean up
            self.stop()
            self.event_receiver.close()
            context.term()
    
        def ready(self):
            return "Yes"
    
        def join(self):
            try:
                self.thread.join()
            except KeyboardInterrupt:
                logging.info("Terminate")
            self.__is_active = False
            self.thread.join()
    
    
    if __name__ == '__main__':
        import random
        from .routeplanner import RoutePlanner
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(message)s')
    
        route_planner = RoutePlanner('localhost', 8080)
    
        s = FleetManager(route_planner)
        v = [s.add_vehicle( random.randrange(5, 30), 'Basel'),
             s.add_vehicle( random.randrange(5, 30), 'Aarau')]
    
        s.start()
        logging.info("Ready")
        time.sleep(4)
    
        #
        vid, d = s.set_job('Basel', 'Aarau')
        logging.info("Assigned %s total dist %f", vid, d)
        time.sleep(1)
        vid, d = s.set_job('Basel', 'Aarau')
        logging.info("Assigned %s total dist %d", vid, d)
    
    
        s.join()
    
  6. Add the file vehicle.py to the directory fleet-manager/manager

    import time
    import logging
    import threading
    import json
    import zmq
    # this is a python 3.12 hack with kafka:
    #  No module named 'kafka.vendor.six.moves'
    import six
    import sys
    if sys.version_info >= (3, 12, 0):
        sys.modules['kafka.vendor.six.moves'] = six.moves
    import kafka
    
    context = zmq.Context().instance()
    
    class Vehicle(object):
    
        def __init__(self, id, speed, location, sink_url, event_streamer={}):
            self.id = id
            self.location = location
            self.speed = speed
            self.departure = 0
            self.scale_factor = 25
            self.sink_url = sink_url
            self.sender = context.socket(zmq.PUSH)
            self.sender.connect(self.sink_url)
            self.is_running = False
            try:
                host = event_streamer.get('host') or 'localhost'
                port = event_streamer.get('port') or 9092
                servers = f'{host}:{port}'
                logging.info(f'bootstrap_servers={servers}')
               self.producer = kafka.KafkaProducer(
                    bootstrap_servers=f'{servers}',
                    api_version=(0, 10, 1),
                    value_serializer=lambda v: json.dumps(v).encode())
                self.topic = 'vehicle-positions'
                logging.info(f'bootstrap_servers={servers}')
            except:
                logging.exception("no event streaming")
    
                self.topic = False
    
        def start(self, itinerary):
            """set itinerary (list of ordered waypoints) and start voyage"""
            self.thread = threading.Thread(target=self.run, args=(itinerary,),
                                           daemon=True)
            self.is_running = True
            self.thread.start()
    
        def stop(self):
            self.is_running = False
            self.sender.close()
    
        def run(self, itinerary):
            """calculates distance and moves to new position
            returns if end of itinerary is reached
            """
            s = 0
            self.departure = time.time()
            time_step = 1000/self.speed/self.scale_factor
            while self.is_running:
                try:
                    dt = time.time() - self.departure
                    s = self.speed * dt * self.scale_factor
                    self.location = itinerary[int(round(s/1000))]
                    msg = dict(location=self.location,
                               id=self.id)
                    if self.topic:
                        self.producer.send(self.topic, msg)
                    self.sender.send_json(msg)
    
                    time.sleep(time_step)
                except IndexError:
                    self.is_running = False
    
            logging.info("Vehicle {0} stopped: pos {1} dist {2:5.1f}".format(
                        self.id, self.location, s/1000))
            self.departure = 0
            self.sender.send_string("{}".format(self.id))
    
        def join(self):
            self.thread.join()
    
  7. Add the file routeplanner.py to the directory fleet-manager/manager:

    import logging
    import urllib.parse
    import urllib.request
    import json
    
    class RoutePlanner(object):
        def __init__(self, host, port):
            self.url = 'http://{}:{}'.format(host, port)
    
        def get_route(self, fromcity, tocity):
            """returns itinerary and total distance for route fromcity to tocity"""
            resp = urllib.request.urlopen(
                self.url + urllib.parse.quote(f'/route/{fromcity}/{tocity}'))
            if resp.getcode() == 200:
                r = json.loads(resp.read().decode())
                try:
                    return dict(itinerary=[i['from'] for i in r] +[r[-1]['to']],
                                total=sum([l['weight'] for l in r]))
                except:
                    pass
            return dict()  # no route found
    
        def get_locations(self, itinerary):
            """returns list of (lon,lat) pairs of cities in itinerary"""
            loc = []
            for c in itinerary:
                resp = urllib.request.urlopen(self.url+urllib.parse.quote(f'/city/{c}'))
                if resp.getcode() == 200:
                    r = json.loads(resp.read().decode())
                    loc.append((r['location']['x'], r['location']['y']))
                else:
                    logging.warn(resp)
            return loc
    
        def get_cities(self):
            """returns a list of countries and cities"""
            d = dict()
            resp = urllib.request.urlopen(self.url+'/countries')
            if resp.getcode() == 200:
                for c in json.loads(resp.read().decode()):
                    country = urllib.parse.quote(c)
                    resp = urllib.request.urlopen(
                        self.url+urllib.parse.quote(f'/cities/{country}'))
                    if resp.getcode() == 200:
                        d[c] = json.loads(resp.read().decode())
            return d
    
    
  8. Add the file __init__.py to the directory fleet-manager/manager:

    from flask import Flask, jsonify, request, abort
    import os
    import json
    import yaml
    import logging
    
    from pathlib import Path
    from .manager import FleetManager
    from .routeplanner import RoutePlanner
    
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(message)s')
    
    def create_app(test_config=None):
        # default location of config file. config-localhost.yaml is used if the environment variable isn’t set
        config_file = Path(os.environ.get('CONFIG_FILE', 'etc/config-localhost.yaml'))
    
        # verify file exists before attempting to read and extend the configuration
        if config_file.is_file():
            config = yaml.load(config_file.read_text(), Loader=yaml.FullLoader)
        else:
            logging.warning("config_file %s not found", config_file)
            config = dict()
        route_planner = config.get('route_planner', dict(host='localhost', port=8080))
        kafka_host = config.get('event_streaming', '')
    
        routePlanner = RoutePlanner(route_planner['host'], route_planner['port'])
        fleetManager = FleetManager(routePlanner,
                                    kafka_host)
    
        app = Flask(__name__)
    
        #@app.before_first_request
        def activate():
            for v in config['vehicles']:
                   id = fleetManager.add_vehicle(v['speed'], v['city'])
                   app.logger.info('Vehicle %s added at %s', id, v['city'])
            fleetManager.start()
    
        @app.route('/cities')
        def cities():
            return jsonify(routePlanner.get_cities())
    
        @app.route('/job', methods = ['POST'])
        def job():
            rec = json.loads(request.data)
            app.logger.info(rec)
            try:
                job = fleetManager.set_job(
                    rec['fromCity'], rec['toCity'])
                if job:
                    return jsonify(job)
            except:
                app.logger.exception("no route found")
            return "No route found", 400
    
        @app.route('/alive')
        def alive():
            return "Yes"
    
        @app.route('/ready')
        def ready():
            if fleetManager.ready():
                return "Yes"
            else:
                abort(500)
    
        with app.app_context():
            activate()
        return app
    
    if __name__ == '__main__':
        app = create_app()
    
    
  9. create the file config-localhost.yaml with following content in the directory fleet-manager:

    route_planner:
      host: 'localhost'
      port: 8080
    vehicles:
      - city: Basel
        speed: 20
      - city: Aarau
        speed: 16
    
  10. Start fleet-manager

    flask --app manager run
    

4. Order Processing

5. Delivery Tracking

Author: Ronald Tanner

Created: 2025-06-26 Thu 08:56

Validate