Case Study: Intermodal Delivery Management System

Table of Contents

1. Overview

INMODS is a simple application system that demonstrates the current state-of-the-art development practices.

User Story:

Users can enter delivery orders with start and end locations and
track their states in real-time on a map.

1.1. Architecture

The system is based on a microservice architecture with services that are implemented either in Java/Spring or Python. It can be run locally on a desktop computer or as docker containers as well as on Kubernetes cloud platforms.

inmods-arch.png

Figure 1: INMODS Architecture

INMODS Services:

Order Processing
creates, modifies and tracks delivery orders
Fleet Manager
allocates vehicles and assigns delivery jobs
Route Planner
finds the shortest route between start and end points of deliveries
Delivery Tracking
displays real-time locations of the vehicles on a map.
Vehicle
a means of transporting items (bicycle, car, truck, train, ship, airplane) which sends GPS location events in regular intervals
Event Streaming
a distributed event streaming platform used to persist movement events of vehicles

1.2. Development Prerequisites

  • Java JDK >=17
  • Python >=3.12
  • IDE (Eclipse, IntelliJ, VSCode/Studio..)
  • Docker
  • Kubernetes (Minikube, K3d ..)

2. Route Planner

The route planner finds the shortest route between start and end points of deliveries. A route is a collection of links between 2 cities. Each city has a (unique) name and a location described by geo coordinates (latitude, longitude) .

The route-planner is a Java Spring-Boot project with Maven.

  1. Go to https://start.spring.io and enter the shown values:

    route-planner-spring-starter.png

    Figure 2: Spring initializr for route-planner

    with the dependencies: DevTools, Spring Web, Spring Data Redis

  2. Activate "GENERATE", download the ZIP-File route-planner.zip und unpack this file into the project directory.
  3. Initialize the git repository and commit the created directory.

    git init
    git add route-planner
    git commit -m "initial commit"
    
  4. Open your Java IDE (Intellij, Eclipse, VisualCode, Netbeans) and import route-planner files as a Maven project.
  5. Create the package org.inmods.routeplanner.model
  6. Create the class City with its getter and setter methods:

    package org.inmods.routeplanner.model;
    
    import org.springframework.data.annotation.Id;
    import org.springframework.data.geo.Point;
    import org.springframework.data.redis.core.RedisHash;
    import org.springframework.data.redis.core.index.GeoIndexed;
    import org.springframework.data.redis.core.index.Indexed;
    
    @RedisHash(value = "city")
    public class City {
            @Id
            String id;
            @Indexed
            String name;
            Integer population;
            @GeoIndexed
            Point location;
            String country;
    }
    
  7. Add equal and hash methods:

    public boolean equals(Object obj){
        if(this == obj)
            return true;
        if((obj == null) || (obj.getClass() != this.getClass()))
            return false;
        City c = (City)obj;
        if( this.id != null) {
            return this.id.equals(c.id);
        }
        return this.name.equals(c.name);
    }
    
    public int hashCode(){
        int hash = 7;
        hash = 31 * hash + id.hashCode();
        return hash;
    }
    
  8. Create the Class Link with its setter and getter methods:

    package org.inmods.routeplanner.model;
    
    import java.util.List;
    import org.springframework.data.redis.core.RedisHash;
    
    /**
     * represents a connection between 2 cities
     */
    @RedisHash(value = "link")
    public class Link {
      public enum TransportMode {
        TRAIN, PLANE, BICYCLE, TRUCK, SHIP;
      }
    
      private String id;
      private String from;
      private String to;
      private Double weight;
      private TransportMode transportMode;
    }
    
  9. Add package org.inmods.routeplanner.repository
  10. Create the interface CityRepository (Point, Distance are geo data):

    package org.inmods.routeplanner.repository;
    public interface CityRepository extends CrudRepository<City, String> {
        List<City> findByName(String name);
        List<City> findByLocationNear(Point point, Distance distance);
    }
    
  11. Create the interface LinkRepository:

    package org.inmods.routeplanner.repository;
    public interface LinkRepository extends CrudRepository<Link, String> { }
    
  12. Create the class DatabaseLoader

    package org.inmods.routeplanner;
    
    import java.io.InputStream;
    import java.util.Scanner;
    import org.inmods.routeplanner.model.City;
    import org.inmods.routeplanner.repository.CityRepository;
    import org.inmods.routeplanner.repository.LinkRepository;
    import org.inmods.routeplanner.model.Link;
    import org.inmods.routeplanner.model.Link.TransportMode;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.context.annotation.Profile;
    import org.springframework.data.geo.Point;
    import org.springframework.stereotype.Component;
    
    @Component
    @Profile("!mocktest")
    public class DatabaseLoader implements CommandLineRunner {
    
      Logger logger = LoggerFactory.getLogger(DatabaseLoader.class);
    
      @Autowired
      private CityRepository cityRepository;
      @Autowired
      private LinkRepository linkRepository;
    
      @Value("${routeplanner.cities}")
      String cityfile;
      @Value("${routeplanner.links}")
      String linkfile;
    
      @Override
      public void run(String... strings) {
        long numCities = cityRepository.count();
        long numLinks = linkRepository.count();
        if (numCities == 0) {
          numCities = loadCities();
          numLinks = loadLinks();
        }
        logger.info("Total {} cities {} links", numCities, numLinks);
      }
    
      private int loadCities() {
        int nrecs = 0;
        InputStream in = this.getClass().getResourceAsStream(cityfile);
        Scanner sc = new Scanner(in, "UTF-8");
        while (sc.hasNextLine()) {
          String[] recs = sc.nextLine().split("\t");
          City c = new City();
          c.setName(recs[0].strip());
          c.setCountry(recs[1].strip());
          c.setPopulation(Integer.valueOf(recs[2]));
          c.setLocation(new Point(Double.valueOf(recs[4]), Double.valueOf(recs[3])));
          cityRepository.save(c);
          nrecs++;
        }
        return nrecs;
      }
    
      private int loadLinks() {
        int nrecs = 0;
        InputStream in = this.getClass().getResourceAsStream(linkfile);
        Scanner sc = new Scanner(in, "UTF-8");
        while (sc.hasNextLine()) {
          String[] recs = sc.nextLine().split("\t");
          if (recs.length < 2) {
            logger.warn("illegal record at line {}", nrecs);
          } else {
            try {
              City a = cityRepository.findByName(recs[0].strip()).get(0);
              City b = cityRepository.findByName(recs[1].strip()).get(0);
              Double dist = a.distance(b);
              Link l = new Link(a.getName(), b.getName(), dist, TransportMode.TRAIN);
              linkRepository.save(l);
              nrecs++;
            } catch (IndexOutOfBoundsException e) {
              logger.warn("City not found in {} {}", recs[0], recs[1]);
            }
          }
        }
        return nrecs;
      }
    
    }
    
  13. add the Constructors to the Link class:

    public Link(){}
    
    public Link(String from, String to,
        Double dist, TransportMode tmode) {
      this.from = from;
      this.to = to;
      this.weight = dist;
      this.transportMode = tmode;
    }
    
  14. Create the method distance in City class:

    public double distance(City to){
      if( this.location.getY() == to.location.getY() &&
          this.location.getX() == to.location.getX() )
        return 0.0;
      // convert coordinates into radian
      double k = Math.PI/180;
      double a = this.location.getY()*k; // Latitude
      double b = this.location.getX()*k; // Longitude
      double c = to.location.getY()*k;   // Latitude
      double d = to.location.getX()*k;   // Longitude
    
      double x = Math.sin(a)*Math.sin(c)+
         Math.cos(a)*Math.cos(c)*Math.cos(b-d);
      double radius = 6371.007176; // in km
      if (x > 1) x = 1.0;
      return radius*Math.acos(x);
    }
    
  15. Add the file cities.txt and links.txt to the resources directory.
  16. add the lines to the application.properties:

    routeplanner.cities=/cities.txt
    routeplanner.links=/links.txt
    
  17. Start redis: docker run --rm -p 6379:6379 redis:7
  18. Start RoutePlannerApplication
  19. Add the package org.inmods.routeplanner.service
  20. Create the class RouteService

    package org.inmods.routeplanner.service;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.stream.Collectors;
    import java.util.stream.StreamSupport;
    import org.inmods.routeplanner.error.CityNotFoundException;
    import org.inmods.routeplanner.repository.CityRepository;
    import org.inmods.routeplanner.model.City;
    import org.inmods.routeplanner.model.Link;
    import org.inmods.routeplanner.repository.LinkRepository;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.geo.Distance;
    import org.springframework.data.geo.Metrics;
    import org.springframework.data.geo.Point;
    import org.springframework.stereotype.Service;
    
    /**
     * @author Ronald Tanner
     */
    @Service
    public class RouteService {
    
      Logger logger = LoggerFactory.getLogger((RouteService.class));
      @Autowired
      CityRepository cityRepository;
      @Autowired
      LinkRepository linkRepository;
    
      /**
       * find all links completely included within an array of cities
       *
       * @param cities
       * @return routes
       */
      List<Link> findAllLinks(List<City> cities) {
        return StreamSupport.stream(linkRepository.findAll().spliterator(), false).filter(
            l -> l.isIncludedIn(cities)).collect(Collectors.toList());
      }
    
      /**
       * find shortest path from source to target city See {@linktourl http://en.wikipedia.org/wiki/Dijkstra%27s_algorithm}
       *
       * @return itinerary
       */
      public List<Link> findShortestPath(String fromName, String toName) {
        List<City> cities = findCitiesBetween(fromName.strip(), toName.strip());
        List<Link> links = findAllLinks(cities);
        logger.info("have {} cities, {} links between {} and {}",
            cities.size(), links.size(), fromName, toName);
        if (links == null || links.size() == 0) {
          return new ArrayList<Link>();
        }
        City source = findCity(fromName);
        City target = findCity(toName);
        List<City> Q = new ArrayList<City>();
        Map<City, Double> dist = new HashMap<City, Double>();
        Map<City, City> previous = new HashMap<City, City>();
        for (City v : cities) {
          dist.put(v, Double.MAX_VALUE);
          previous.put(v, null);
          Q.add(v);
        }
        dist.put(source, 0.);
        while (!Q.isEmpty()) {
          City u = null;
          int i = 0;
          int iu = 0;
          double minDist = Double.MAX_VALUE;
          // find city u with shortest dist
          for (City c : Q) {
            if (dist.get(c) < minDist) {
              iu = i;
              u = c;
              minDist = dist.get(c);
            }
            i++;
          }
          if (u != null) {
            Q.remove(iu);
            for (City n : findNeighbors(links, u)) {
              double alt = dist.get(u) + getDist(links, u, n);
              if (!dist.containsKey(n) || alt < dist.get(n)) {
                dist.put(n, alt);
                previous.put(n, u);
              }
            }
          } else {
            break;
          }
        }
        LinkedList<City> citiesOnPath = new LinkedList<City>();
        City u = target;
        while (previous.get(u) != null) {
          citiesOnPath.addFirst(u);
          u = previous.get(u);
        }
        citiesOnPath.addFirst(source);
        // create path from source to target
        City from = null;
        City to = null;
        List<Link> itinerary = new ArrayList<Link>();
        for (City c : citiesOnPath) {
          from = to;
          to = c;
          if (from != null) {
            itinerary.add(findRoute(from, to, links));
          }
        }
        // set link id:
        int i = 0;
        for (Link l : itinerary) {
          l.setId(Integer.toString(i));
          i++;
        }
        return itinerary;
      }
    
      private Double getDist(List<Link> links, City a, City b) {
        for (Link r : links) {
          if ((a.getName().equals(r.getFrom()) && b.getName().equals(r.getTo()))
              || (b.getName().equals(r.getFrom()) && a.getName().equals(r.getTo()))) {
            return r.getWeight();
          }
        }
        return Double.MAX_VALUE;
      }
    
      private Link findRoute(City from, City to, List<Link> links) {
        for (Link r : links) {
          if ((from.getName().equals(r.getFrom()) && to.getName().equals(r.getTo())) ||
              (to.getName().equals(r.getFrom()) && from.getName().equals(r.getTo()))) {
            return new Link(from.getName(), to.getName(), r.getWeight(), r.getTransportMode());
          }
        }
        return null;
      }
    
      private List<City> findNeighbors(List<Link> links, City c) {
        List<City> neighbors = new ArrayList<City>();
        for (Link r : links) {
          if (c.getName().equals(r.getFrom())) {
            neighbors.add(findCity(r.getTo()));
          } else if (c.getName().equals(r.getTo())) {
            neighbors.add(findCity(r.getFrom()));
          }
        }
        return neighbors;
      }
    
      /**
       * find city by name
       *
       * @param name of city
       * @return
       */
      public City findCity(String name) {
        try {
          return cityRepository.findByName(name).get(0);
        }catch(IndexOutOfBoundsException ex){
          throw new CityNotFoundException(name);
        }
      }
    
      /**
       * find all cities located within the distance of 2 cities
       *
       * @param fromName name of starting city
       * @param toName   name of destination city
       * @return array of cities between from city and to city
       */
      List<City> findCitiesBetween(String fromName, String toName) {
        if (fromName == null || toName == null) {
          return new ArrayList<City>();
        }
        City from = findCity(fromName);
        City to = findCity(toName);
        Distance dist = new Distance(from.distance(to), Metrics.KILOMETERS);
        Point center = new Point((to.getLocation().getX() + from.getLocation().getX()) / 2,
            (to.getLocation().getY() + from.getLocation().getY()) / 2);
        return cityRepository.findByLocationNear(center, dist);
      }
    
      /**
       * find linked cities of a country
       *
       * @param country
       * @return array of linked cities in country
       */
      public Iterable<String> findCities(String country) {
        Set<String> cities = new HashSet<String>();
        for (Link l : linkRepository.findAll()) {
          for (City c : List.of(findCity(l.getFrom()), findCity(l.getTo()))) {
            if (c.getCountry().equals(country)) {
              cities.add(c.getName());
            }
          }
        }
        return cities;
      }
    
      /**
       * find all cities of country
       * @param country
       * @return cities
       */
      public Iterable<City> findAllCities(String country){
        return StreamSupport.stream(cityRepository.findAll().spliterator(), true)
            .filter(c -> country.equals(c.getCountry())).collect(Collectors.toList());
      }
    
      /**
       * find all countries with linked cities
       *
       * @return array of names of all included country
       */
      public Iterable<String> findCountries() {
        Set<String> countries = new HashSet<String>();
        for (Link l : linkRepository.findAll()) {
          for (City c : List.of(findCity(l.getFrom()), findCity(l.getTo()))) {
            countries.add(c.getCountry());
          }
        }
        return countries;
      }
    }
    
  21. add the method isIncludedIn to the class Link:

    /**
      * check if both ends of this link are included in the array of cities
      *
      * @param cities
      * @return true if this link is included in cities
      */
    public boolean isIncludedIn(List<City> cities) {
      for (City c : cities) {
        if (c.getName().equals(from) || c.getName().equals(to)) {
          return true;
        }
      }
      return false;
    }
    
  22. create the package error and the class CityNotFoundException with following content:

    package org.inmods.routeplanner.error;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.web.bind.annotation.ResponseStatus;
    
    @ResponseStatus(value= HttpStatus.NOT_FOUND, reason="No such City")
    public class CityNotFoundException extends RuntimeException{
        public CityNotFoundException(String name){
        super("City \"" + name + "\"");
      }
    }
    
  23. Add package org.inmods.routeplanner.controller
  24. Create class RouteController

    package org.inmods.routeplanner.controller;
    @RestController
    public class RouteController {
    
      @Autowired
      RouteService routeService;
    
      @GetMapping("/route/{from}/{to}")
      public List<Link> findShortestRoute(@PathVariable String from, @PathVariable String to){
        return routeService.findShortestPath(from, to);
      }
    
      @GetMapping("/city/{name}")
      public City findCity(@PathVariable String name){
        return routeService.findCity(name);
      }
    
      @GetMapping("/cities/")
      public Iterable<String> getCities(@PathVariable String country){
        return routeService.findCities(country);
      }
    
      @GetMapping("/countries")
      public Iterable<String> getCountries(){
        return routeService.findCountries();
      }
    }
    

3. Fleet Manager

The Fleet Manager allocates vehicles and assigns delivery jobs. This is a python project with flask.

  1. create the module directories in the project directory

    mkdir  -p fleet-manager/{etc,tests}
    
  2. Add the file requirements.txt with following content to the fleet-manager directory

    grpcio
    grpcio-tools
    pyproj
    pyzmq
    pyaml
    kafka-python
    
  3. Create a new python virtual environment inmods-venv, activate it and install the requirements:

    python -m venv inmods-venv
    source inmods-venv/bin/activate
    pip install -r fleet-manager/requirements.txt
    
  4. create the config file config.yaml with following content in the directory fleet-manager/etc:

    route_planner:
      host: 'route-planner'
      port: 8080
    event_streamer:
      host: 'event-streamer'
      port: 9092
    vehicles:
      - city: Basel
        speed: 20
      - city: Aarau
        speed: 16
    
  5. Add the file manager.py to the directory fleet-manager:

    import logging
    import pyproj   #  generic coordinate transformation of geospatial coordinates
    from vehicle import Vehicle
    import time
    import zmq
    import uuid
    import threading
    from urllib.error import HTTPError, URLError
    
    geod = pyproj.Geod(ellps='WGS84')
    event_url = "inproc://events"
    context = zmq.Context().instance()
    
    
    def get_waypts(a, b):
        """return km spaced list of (lon/lat) pairs between a and b
        Args:
          a: start location
          b: end location"""
        dist = geod.inv(*a, *b)[-1]
        return [a] + geod.npts(*a, *b, 1+int(dist/1000))
    
    
    class FleetManager(object):
    
        def __init__(self, route_planner, kafka_host={}):
            self.route_planner = route_planner
            self.event_receiver = context.socket(zmq.PULL)
            self.thread = 0
            self.vehicles = {}
            self.kafka_host = kafka_host
    
        def add_vehicle(self, speed, city):
            loc = 0
            for i in range(5):
                try:
                    loc = self.route_planner.get_locations([city])[0]
                    break
                except (HTTPError, URLError) as ex:
                    logging.warning("%s", ex)
                time.sleep(1)
            if loc == 0:
                return 0
            v = Vehicle(str(uuid.uuid4())[:8],
                        speed,
                        loc,
                        event_url, self.kafka_host)
            self.vehicles[v.id] = dict(vehicle=v, city=city)
            return v.id
    
        def set_job(self, fromcity, tocity):
            """assigns delivery job to vehicle closest to fromcity"""
            cand = []
            for k in self.vehicles:
                if not self.vehicles[k]['vehicle'].is_running:
                    if self.vehicles[k]['city'] == fromcity:
                        cand = [(self.vehicles[k],
                                 dict(itinerary=[], total=0))]
                        break
                    pickup = self.route_planner.get_route(
                        self.vehicles[k]['city'], fromcity)
                    if pickup:
                        cand.append((self.vehicles[k], pickup))
    
            try:
                v, pickup = sorted(cand, key=lambda k: k[1]['total'])[0]
                r = self.route_planner.get_route(fromcity, tocity)
                ity = self.route_planner.get_locations(pickup['itinerary'] + r['itinerary'])
                logging.info("  itinerary %s", ity)
                wp = [x for y in zip(ity, ity[1:])
                      for x in get_waypts(y[0], y[1])]
    
                v['vehicle'].start(wp + [ity[-1]])
    
                v['city']=tocity
                return dict(vehicle=v['vehicle'].id,
                            dist=r['total'], pickup=pickup['total'])
            except Exception:
                logging.exception("no route found")
            return {}
    
        def start(self):
            self.event_receiver.bind(event_url)
            self.thread = threading.Thread(target=self.handle_events,
                                           daemon=True)
            self.thread.start()
    
        def stop(self):
            """stop all running vehicles"""
            for k in self.vehicles.keys():
                self.vehicles[k]['vehicle'].stop()
            self.event_receiver.close()
    
        def handle_events(self):
            while self.thread:
                msg = self.event_receiver.recv()
                logging.info(msg)
            # clean up
            self.stop()
    
        def ready(self):
            return "Yes"
    
        def join(self):
            try:
                self.thread.join()
            except KeyboardInterrupt:
                logging.info("Terminate")
            self.thread.join()
            self.thread = 0
    
  6. Add the file vehicle.py to the directory fleet-manager/manager

    import time
    import logging
    import threading
    import json
    import zmq
    import kafka
    
    context = zmq.Context().instance()
    
    class Vehicle(object):
    
        def __init__(self, id, speed, location, sink_url, event_streamer={}):
            self.id = id
            self.location = location
            self.speed = speed
            self.departure = 0
            self.scale_factor = 25
            self.sink_url = sink_url
            self.sender = context.socket(zmq.PUSH)
            self.sender.connect(self.sink_url)
            self.is_running = False
            self.topic = False
            if event_streamer:
                host = event_streamer.get('host') or 'localhost'
                port = event_streamer.get('port') or 9092
                servers = f'{host}:{port}'
                for i in range(5):
                    try:
                        self.producer = kafka.KafkaProducer(
                            bootstrap_servers=servers,
                            value_serializer=lambda v: json.dumps(v).encode())
                        self.topic = 'vehicle-positions'
                        logging.info('connected to bootstrap_servers=%s', servers)
                        self._publish_location()
                        break
                    except kafka.errors.NoBrokersAvailable:
                        pass
                    time.sleep(1)
    
            if not self.topic:
                logging.warning("no event streaming %s", event_streamer)
    
        def start(self, itinerary):
            """set itinerary (list of ordered waypoints) and start voyage"""
            self.thread = threading.Thread(target=self.run, args=(itinerary,),
                                           daemon=True)
            self.is_running = True
            self.thread.start()
    
        def stop(self):
            self.is_running = False
            self.sender.close()
    
        def _publish_location(self):
            msg = dict(location=self.location,
                       id=self.id)
            logging.info("topic %s msg %s", self.topic, msg)
            if self.topic:
                self.producer.send(self.topic, msg)
            self.sender.send_json(msg)
    
        def run(self, itinerary):
            """calculates distance and moves to new position
            returns if end of itinerary is reached
            """
            s = 0
            self.departure = time.time()
            time_step = 1000/self.speed/self.scale_factor
            while self.is_running:
                try:
                    dt = time.time() - self.departure
                    s = self.speed * dt * self.scale_factor
                    self.location = itinerary[int(round(s/1000))]
                    self._publish_location()
                    time.sleep(time_step)
                except IndexError:
                    self.is_running = False
    
            logging.info("Vehicle {0} stopped: pos {1} dist {2:5.1f}".format(
                        self.id, self.location, s/1000))
            self.departure = 0
            self.sender.send_string("{}".format(self.id))
    
        def join(self):
            self.thread.join()
    
  7. Add the file routeplanner.py to the directory fleet-manager:

    import logging
    import urllib.parse
    import urllib.request
    import json
    
    class RoutePlanner(object):
        def __init__(self, host, port):
            self.url = 'http://{}:{}'.format(host, port)
            logging.info("routeplanner %s", self.url)
    
        def get_route(self, fromcity, tocity):
            """returns itinerary and total distance for route fromcity to tocity"""
            with urllib.request.urlopen(
                    self.url + urllib.parse.quote(f'/route/{fromcity}/{tocity}')) as resp:
                if resp.getcode() == 200:
                    r = json.loads(resp.read().decode())
                    try:
                        return dict(itinerary=[i['from'] for i in r] +[r[-1]['to']],
                                    total=sum([l['weight'] for l in r]))
                    except:
                        pass
            return {}  # no route found
    
        def get_locations(self, itinerary):
            """returns list of (lon,lat) pairs of cities in itinerary"""
            loc = []
            for c in itinerary:
                with urllib.request.urlopen(
                        self.url+urllib.parse.quote(f'/city/{c}')) as resp:
                    if resp.getcode() == 200:
                        r = json.loads(resp.read().decode())
                        loc.append((r['location']['x'], r['location']['y']))
                    else:
                        logging.warning(resp)
            return loc
    
        def get_cities(self):
            """returns a list of countries and cities"""
            d = {}
            with urllib.request.urlopen(self.url+'/countries') as resp:
                if resp.getcode() == 200:
                    for c in json.loads(resp.read().decode()):
                        country = urllib.parse.quote(c)
                        with urllib.request.urlopen(
                            self.url+urllib.parse.quote(
                                f'/cities/{country}')) as resp:
                            if resp.getcode() == 200:
                                d[c] = json.loads(resp.read().decode())
            return d
    
  8. Add the file delivery_job.proto to the directory fleet-manager:

    syntax = "proto3";
    
    import "google/protobuf/empty.proto";
    package inmods;
    
    service DeliveryJob {
      rpc FindCities(google.protobuf.Empty) returns (stream City) {}
      rpc Create(DeliveryOrder) returns (Confirmation) {}
    }
    
    message City {
      string name = 1;
      string country = 2;
    }
    
    message DeliveryOrder {
      string fromCity = 1;
      string toCity = 2;
    }
    
    message Confirmation {
      string vehicle = 1;
      double dist = 2;
      double pickup = 3;
    }
    
  9. Add the file Makefile to the directory fleet-manager:

    GENSRCS=delivery_job_pb2.py delivery_job_pb2.pyi delivery_job_pb2_grpc.py
    $(GENSRCS): delivery_job.proto
         python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. \
           --grpc_python_out=. $<
    
    clean:
         $(RM) $(GENSRCS)
    
  10. Generate the gRPC code:

    make
    
  11. Add the file server.py to the directory fleet-manager:

    import os
    import json
    import yaml
    import logging
    
    from pathlib import Path
    from manager import FleetManager
    from routeplanner import RoutePlanner
    
    import grpc
    from concurrent import futures
    
    import delivery_job_pb2
    import delivery_job_pb2_grpc
    from delivery_job_pb2_grpc import DeliveryJobStub as stub
    from delivery_job_pb2_grpc import DeliveryJobServicer
    
    
    class DeliveryJobServicer(delivery_job_pb2_grpc.DeliveryJobServicer):
        def __init__(self, config):
            route_planner = config.get('route_planner',
                                       {'host': 'localhost', 'port': 8080})
            kafka_host = config.get('event_streamer', {})
    
            self.routePlanner = RoutePlanner(route_planner['host'], route_planner['port'])
            self.fleetManager = FleetManager(self.routePlanner,
                                             kafka_host)
    
            for v in config['vehicles']:
                id = self.fleetManager.add_vehicle(v['speed'], v['city'])
                if id:
                    logging.info('Vehicle %s added at %s', id, v['city'])
            if not self.fleetManager.vehicles:
                logging.warning('No vehicles available')
            self.fleetManager.start()
    
        def FindCities(self, request, context):
            countries = self.routePlanner.get_cities()
            for country in countries:
                for cityname in countries[country]:
                    city = delivery_job_pb2.City()
                    city.name = cityname
                    city.country = country
                    yield city
    
        def Create(self, request, context):
            try:
                job = self.fleetManager.set_job(
                    request.fromCity, request.toCity)
                if job:
                    confirmation = delivery_job_pb2.Confirmation()
                    confirmation.vehicle = job['vehicle'].encode()
                    confirmation.dist = job['dist']
                    confirmation.pickup = job['pickup']
                    return confirmation
            except:
                logging.exception("no route found")
            return "No route found", 400
    
    
    def serve(config, port=5000):
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        delivery_job_pb2_grpc.add_DeliveryJobServicer_to_server(
            DeliveryJobServicer(config), server)
        server.add_insecure_port(f'[::]:{port}')
        logging.info(f"Server listening on port {port}...")
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        import os
        import pathlib
        import pyaml
    
        test_config = None
        # default location of config file. config-localhost.yaml is used if the environment variable isn’t set
        if test_config is None:
            config_file = Path(os.environ.get('CONFIG_FILE', 'config-localhost.yaml'))
            # verify file exists before attempting to read and extend the configuration
            if config_file.is_file():
                config = yaml.load(config_file.read_text(), Loader=yaml.FullLoader)
            else:
                logging.warning("config_file %s not found", config_file)
        else:
            config=test_config
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(message)s')
        serve(config)
    
  12. create the file config-localhost.yaml with following content in the directory fleet-manager:

    route_planner:
      host: 'localhost'
      port: 8080
    vehicles:
      - city: Basel
        speed: 20
      - city: Aarau
        speed: 16
    
  13. Start fleet-manager

    python ./server.py
    

4. Order Processing

  1. Go to https://start.spring.io and enter the shown values:

    order-processing-spring-starter.png

    Figure 3: Spring initializr for order-processing

    with the dependencies: DevTools, Spring Web, Thymeleaf, Spring Security, Validation, Spring Data JPA, Spring gRPC, H2 Database, Testcontainers

  2. Press Generate Button (CTRL-+) and download the ZIP-File order-processing.zip.
  3. Unpack this zip file into the project directory.
  4. Add and commit the created directory

    git add order-processing
    git commit -m "added module"
    
  5. Add the file delivery_job.proto to the directory src/main/proto:

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "org.inmods.orderprocessing";
    option java_outer_classname = "DeliveryJobProto";
    import "google/protobuf/empty.proto";
    package inmods;
    
    service DeliveryJob {
     rpc FindCities(google.protobuf.Empty) returns (stream City) {}
     rpc Create(DeliveryOrder) returns (Confirmation) {}
    }
    
    message City {
     string name = 1;
     string country = 2;
    }
    
    message DeliveryOrder {
     string fromCity = 1;
     string toCity = 2;
    }
    
    message Confirmation {
     string vehicle = 1;
     double dist = 2;
     double pickup = 3;
    }
    
  6. Open your Java IDE (Intellij, Eclipse, VisualCode, Netbeans) and import order-processing files as a Maven project.
  7. Create the package org.inmods.orderprocessing.model
  8. Create the class Order with its getter and setter methods:

    package org.inmods.orderprocessing.model;
    
    import javax.persistence.Entity;
    import javax.persistence.GeneratedValue;
    import javax.persistence.GenerationType;
    import javax.persistence.Id;
    import javax.persistence.Table;
    
    @Entity
    @Table(name="DeliveryOrder")
    public class Order {
      @Id
      @GeneratedValue(strategy = GenerationType.SEQUENCE)
      private Long id;
      private String fromCity;
      private String toCity;
      private Double distance;
    }
    
  9. Create the package org.inmods.orderprocessing.repository
  10. Create the interface OrderRepository

    package org.inmods.orderprocessing.repository;
    public interface OrderRepository extends JpaRepository<Order, Long> {}
    
  11. Add the package org.inmods.orderprocessing.service
  12. Create the class OrderService

    package org.inmods.orderprocessing.model;
    
    import javax.persistence.Entity;
    import javax.persistence.GeneratedValue;
    import javax.persistence.GenerationType;
    import javax.persistence.Id;
    import javax.persistence.Table;
    
    @Entity
    @Table(name="DeliveryOrder")
    public class Order {
      @Id
      @GeneratedValue(strategy = GenerationType.SEQUENCE)
      private Long id;
      private String fromCity;
      private String toCity;
      private Double distance;
    }
    
  13. Add the package org.insmod.orderprocessing.controller
  14. Create the class OrderController:

    package org.inmods.orderprocessing.controller;
    
    import java.util.List;
    import java.util.Optional;
    
    import org.inmods.orderprocessing.Confirmation;
    import org.inmods.orderprocessing.DeliveryJobGrpc;
    import org.inmods.orderprocessing.DeliveryOrder;
    import org.inmods.orderprocessing.model.Order;
    import org.inmods.orderprocessing.service.OrderService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Controller;
    import org.springframework.ui.Model;
    import org.springframework.validation.BindingResult;
    import org.springframework.web.bind.annotation.ExceptionHandler;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.client.RestClientResponseException;
    import org.springframework.web.servlet.ModelAndView;
    
    import jakarta.servlet.http.HttpServletRequest;
    import jakarta.validation.Valid;
    
    @Controller
    @RequestMapping("/")
    public class OrderController {
            Logger logger = LoggerFactory.getLogger(OrderController.class);
        @Autowired
        OrderService orderService;
        @Autowired
        DeliveryJobGrpc.DeliveryJobBlockingStub stub;
    
        @RequestMapping
        public String getAllOrders(Model model){
            List<Order> orders = orderService.findAll();
            model.addAttribute("orders", orders);
            return "list-orders";
        }
    
        @RequestMapping(path = "/create", method = RequestMethod.POST)
        public String create(@Valid Order order, BindingResult result, Model m)
        {
            if(result.hasErrors()) {
                logger.warn("validation error {}", m);
                return "add-edit-order";
            }
            orderService.save(order);
            return "redirect:/";
        }
    
        @RequestMapping(path = {"/edit", "/edit/{id}"})
        public String edit(Model model, @PathVariable("id") Optional<Long> id)
        {
            if (id.isPresent()) {
                Optional<Order> order = orderService.findById(id.get());
                model.addAttribute("order", order.get());
            } else {
                model.addAttribute("order", new Order());
            }
            return "add-edit-order";
        }
    
        @RequestMapping("/start/{id}")
        public String start(Model model, @PathVariable("id") Long id){
            Optional<Order> order = orderService.findById(id);
            DeliveryOrder request = DeliveryOrder.newBuilder()
                            .setFromCity(order.get().getFromCity())
                            .setToCity(order.get().getToCity()).build();
            Confirmation confirmation = stub.create(request);
    
            order.get().setDistance(confirmation.getDist());
            orderService.save(order.get());
            logger.info("Vehicle {} Dist {}", confirmation.getVehicleBytes().toString(), confirmation.getDist());
            return "redirect:/";
        }
    
        @ExceptionHandler(RestClientResponseException.class)
        public ModelAndView handleError(HttpServletRequest req, Exception ex) {
            logger.error("Request: " + req.getRequestURL() + " raised " + ex);
    
            ModelAndView mav = new ModelAndView();
            mav.addObject("exception", ex);
            mav.addObject("url", req.getRequestURL());
            mav.setViewName("error");
            return mav;
        }
    
    }
    
  15. Add the file add-edit-order.html to the directory src/resources/templates:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    
    <head>
      <meta charset="utf-8">
      <meta http-equiv="x-ua-compatible" content="ie=edge">
      <title>Add Order</title>
      <meta name="viewport" content="width=device-width, initial-scale=1">
      <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.1.3/css/bootstrap.min.css">
      <link rel="stylesheet" href="https://use.fontawesome.com/releases/v5.4.1/css/all.css">
    </head>
    
    <body>
    <div class="container my-5">
      <h3> Edit Order</h3>
      <div class="card">
        <div class="card-body">
          <div class="col-md-10">
            <form action="#" th:action="@{/create}" th:object="${order}"
                  method="post">
              <div class="row">
                <div class="form-group col-md-8">
                  <label for="fromCity" class="col-form-label">From</label>
                  <input type="text" th:field="*{fromCity}" class="form-control"
                         id="fromcity" placeholder="From City" />
                </div>
                <div class="form-group col-md-8">
                  <label for="toCity" class="col-form-label">To</label>
                  <input type="text" th:field="*{toCity}" class="form-control"
                         id="tocity" placeholder="To City" />
                </div>
    
                <div class="col-md-6">
                  <input type="submit" class="btn btn-primary" value=" Submit ">
                </div>
    
                <input type="hidden" id="id" th:field="*{id}">
    
              </div>
            </form>
          </div>
        </div>
      </div>
    </div>
    </body>
    
    </html>
    
  16. Add the file list-orders.html to the directory src/resources/templates:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    
    <head>
      <meta charset="utf-8">
      <meta http-equiv="x-ua-compatible" content="ie=edge">
      <title>All Orders</title>
      <meta name="viewport" content="width=device-width, initial-scale=1">
      <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.1.3/css/bootstrap.min.css">
      <link rel="stylesheet" href="https://use.fontawesome.com/releases/v5.4.1/css/all.css">
    </head>
    
    <body>
    <div class="container my-2">
      <form th:action="@{/logout}" method="post">
          <input type="submit" value="Sign Out"/>
      </form>
      <div class="card">
        <div class="card-body">
          <div th:switch="${orders}" class="container my-5">
            <p class="my-5">
              <a href="/edit" class="btn btn-primary">
                <i class="fas fa-plus ml-2"> Add Order </i></a>
            </p>
            <div class="col-md-10">
              <h2 th:case="null">No record found !!</h2>
              <div th:case="*">
                <table class="table table-striped table-responsive-md">
                  <thead>
                  <tr>
                    <th>From</th>
                    <th>To</th>
                    <th>Dist / km</th>
                    <th>Edit</th>
                    <th>Start</th>
                  </tr>
                  </thead>
                  <tbody>
                  <tr th:each="order : ${orders}">
                    <td th:text="${order.fromCity}"></td>
                    <td th:text="${order.toCity}"></td>
                    <td th:text="${#numbers.formatDecimal(order.distance, 0, 1)}"></td>
                    <td>
                      <div th:if="${order.distance} == null">
                      <a th:href="@{/edit/{id}(id=${order.id})}"
                         class="btn btn-primary">
                        <i class="fas fa-edit ml-2"></i>
                      </a>
                      </div>
                    </td>
                    <td>
                      <div th:if="${order.distance} == null">
                      <a th:href="@{/start/{id}(id=${order.id})}"
                         class="btn btn-primary">
                        <i class="fas fa-check ml-2"></i>
                      </a>
                      </div>
                    </td>
                  </tr>
                  </tbody>
                </table>
              </div>
    
            </div>
          </div>
        </div>
      </div>
    </div>
    </body>
    
    </html>
    
  17. Add following lines in src/main/resources/application.properties:

    spring.security.user.name=admin
    spring.security.user.password=aiPaigh4
    spring.grpc.client.channels.fleet-manager.address=0.0.0.0:5000
    
  18. Add the following lines to the file DeliveryOrder:

    @Bean
    DeliveryJobGrpc.DeliveryJobBlockingStub stub(GrpcChannelFactory channels) {
            return DeliveryJobGrpc.newBlockingStub(channels.createChannel("fleet-manager"));
    }
    
  19. Start DeliveryOrderProcessingApplication and enter your orders:

    order-processing.png

    Figure 4: Order Processing Main Window

5. Event Streaming

create the directory event-streaming and add the File docker-compose.yaml (for details see https://hub.docker.com/r/apache/kafka):

services:
  event-streamer:
    image: apache/kafka:4.0.0
    ports:
      - '9092:9092'
    environment:
      - KAFKA_NODE_ID=1
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_PROCESS_ROLES=broker,controller
      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CONTROLLER_QUORUM_VOTERS=1@event-streamer:9093

6. Delivery Tracking

  1. create directories delivery-tracking/etc, delivery-tracking/panel, delivery-tracking/panel/templates

    mkdir -p delivery-tracking/{etc,panel,panel/templates}
    

    NOTE: this command only works on a unix sh (such as bash), following commands create the same directory structure:

    mkdir delivery-tracking/
    mkdir delivery-tracking/etc
    mkdir delivery-tracking/panel
    mkdir delivery-tracking/panel/templates
    
  2. add file config.ini with following content to directory delivery-tracking/etc

    [event-streamer]
    host=event-streamer
    port=9092
    
  3. add file __init__.py to the directory delivery-tracking/panel

    from flask import Flask, Response
    from flask import render_template
    import os
    import json
    from configparser import ConfigParser
    from pathlib import Path
    from .monitor import VehicleMonitor
    import logging
    
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(message)s')
    
    # initialize the configuration parser with all the existing environment variables
    config = ConfigParser(os.environ)
    
    # default location of config file. config-localhost.ini is used if the environment variable isn’t set
    config_file = Path(os.environ.get('CONFIG_FILE','config-localhost.ini'))
    
    # verify file exists before attempting to read and extend the configuration
    if config_file.is_file():
        config.read(str(config_file))
    
    port = config.getint('event-streamer', 'port', fallback=9092)
    host = config.get('event-streamer', 'host', fallback='localhost')
    monitor = VehicleMonitor(host, port)
    
    def create_app():
        app = Flask(__name__)
        #app.config['EXPLAIN_TEMPLATE_LOADING'] = True
    
        def activate():
            monitor.open()
    
        @app.route('/')
        def index():
            return render_template('index.html')
    
        @app.route('/positions')
        def positions():
            def events():
                for msg in monitor.get_messages():
                    vehicle = json.loads(msg.value)
                    loc = vehicle['location']
                    pos = json.dumps(dict(id=vehicle['id'], lon=loc[0], lat=loc[1]))
                    app.logger.info(pos)
                    yield f'data:{pos}\n\n'
    
            return Response(events(),
                            mimetype="text/event-stream")
    
        with app.app_context():
            activate()
        return app
    
  4. add the file monitor.py to the directory delivery-tracking/panel:

    import kafka
    import logging
    import time
    
    
    class VehicleMonitor(object):
        def __init__(self, host, port):
            self.servers = f'{host}:{port}'
            self.topicName = 'vehicle-positions'
    
        def open(self):
            for i in range(5):
                try:
                    self.consumer = kafka.KafkaConsumer(
                        self.topicName,
                        bootstrap_servers=self.servers)
                    logging.info('Vehicle monitor listening on %s', self.servers)
                    break
    
                except kafka.errors.NoBrokersAvailable:
                    pass
                time.sleep(2)
            if not hasattr(self, 'consumer'):
                logging.error('Vehicle monitor NoBrokersAvailable %s for topic %s',
                              self.servers, self.topicName)
    
        def get_messages(self):
            """consume events from kafka topic"""
            for msg in self.consumer:
                if msg:
                    yield msg
                else:
                    break
            self.consumer.close()
    
  5. add the file index.html to the directory delivery-tracking/panel/templates:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Delivery Tracking</title>
    
          <link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
              integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
              crossorigin=""/>
    
         <script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
              integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo="
              crossorigin=""></script>
    </head>
    <body>
      <h1>Delivery Tracking</h1>
      <div id="mapid" style="width:900px;height:600px;" />
      <script>
        var mymap = L.map('mapid').setView([47.57, 7.58], 8);
        L.tileLayer('https://tile.openstreetmap.org/{z}/{x}/{y}.png', {
        attribution: 'Map data &copy; <a href="https://www.openstreetmap.org/">OpenStreetMap</a>',
        maxZoom: 18
        }).addTo(mymap);
    
        mapMarkers = new Map();
        var source = new EventSource('/positions');
        source.addEventListener('message', function(e) {
            pos = JSON.parse(e.data);
            console.log(pos);
            if( mapMarkers.has(pos.id) ) {
                markers = mapMarkers.get(pos.id);
                for(var i=0; i < markers.length; i++){
                    mymap.removeLayer(markers[i]);
                }
            }
            else {
                mapMarkers.set(pos.id, []);
            }
            m = L.marker([pos.lat, pos.lon]).addTo(mymap);
            markers = mapMarkers.get(pos.id);
            markers.push(m);
        });
      </script>
    </body>
    </html>
    
  6. activate python environment and start web server:

    source inmods-venv/bin/activate
    flask --app panel run --port 5001
    
  7. open with your web browser: http://localhost:5001

    delivery-tracking.png

    Figure 5: Delivery Tracking Main Window

7. Testing

7.1. Fleet Manager

  1. install pytest: (make sure that indmods-venv is activated)

    $ pip install pytest
    
  2. create the file __init__.py in directory tests:

    $ touch tests/__init__.py
    
  3. add the file test_vehicle.py to the directory tests:

    from vehicle import Vehicle
    import zmq
    import logging
    
    def test_vehicle():
        context = zmq.Context.instance()
        event_url = "inproc://location-events"
        event_receiver = context.socket(zmq.PULL)
        event_receiver.bind(event_url)
        itinerary = ((7.58, 47.57),
                     (7.589388372000748, 47.56500578532769),
                    (7.598774959308661, 47.5600107979814))
        v = Vehicle('1', 15.0, (7.58, 47.57), event_url)
        v.start(itinerary)
        num_msgs = 0
        while(True):
            msg = event_receiver.recv()
            logging.info(msg)
            if msg.decode() == v.id:
                break
            num_msgs += 1
    
        v.join()
        event_receiver.close()
        assert num_msgs == len(itinerary)
    
  4. run pytest

    $ pytest -v
    
  5. create the file test_manager.py in the directory tests:

    import pytest
    from manager import FleetManager
    from routeplanner import RoutePlanner
    
    @pytest.fixture()
    def app():
        route_planner = RoutePlanner('localhost', 8080)
        app = FleetManager(route_planner)
        return app
    
    def test_ready(app):
        assert "Yes" == app.ready()
    
    def test_job(app, monkeypatch):
        def get_route(fromcity, tocity):
            if fromcity == tocity:
                return []
            return {'itinerary': ['Basel', 'Liestal'], 'total': 1}
        def get_locations(itinerary):
            loc ={'Basel':  (47.57, 7.58),
                  'Liestal': (47.49, 7.73)}
            return [loc[c] for c in itinerary]
        app.add_vehicle(10, 'Basel')
        monkeypatch.setattr(app.route_planner,
                            "get_route", get_route)
        monkeypatch.setattr(app.route_planner,
                            "get_locations", get_locations)
    
        rv = app.set_job('Basel', 'Liestal')
        assert rv['dist'] == 1
    
  6. run pytest

7.2. Route Planner

  1. create the package model and add the test class LinkTest

    package org.inmods.routeplanner.model;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.ArrayList;
    import java.util.List;
    import org.junit.jupiter.api.BeforeAll;
    import org.junit.jupiter.api.Test;
    
    class LinkTest {
    
      static List<City> cities;
    
      @BeforeAll
      public static void setup(){
        cities = new ArrayList<City>();
        City city = new City();
        city.setName("Basel");
        cities.add(city);
        city = new City();
        city.setName("Liestal");
        cities.add(city);
      }
      @Test
      public void cityIsIncluded(){
        Link l = new Link();
        l.setFrom("Basel");
        l.setTo("Bern");
        assertTrue(l.isIncludedIn(cities));
      }
    }
    
  2. create the package controller and add the test class RouteControllerTest

    package org.inmods.routeplanner.model;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.ArrayList;
    import java.util.List;
    import org.junit.jupiter.api.BeforeAll;
    import org.junit.jupiter.api.Test;
    
    class LinkTest {
    
      static List<City> cities;
    
      @BeforeAll
      public static void setup(){
        cities = new ArrayList<City>();
        City city = new City();
        city.setName("Basel");
        cities.add(city);
        city = new City();
        city.setName("Liestal");
        cities.add(city);
      }
      @Test
      public void cityIsIncluded(){
        Link l = new Link();
        l.setFrom("Basel");
        l.setTo("Bern");
        assertTrue(l.isIncludedIn(cities));
      }
    }
    
  3. create the package repository and add the class RedisContainer to this repository test directory.

    package org.inmods.routeplanner.repository;
    
    import org.testcontainers.containers.GenericContainer;
    
    public class RedisContainer {
      private static GenericContainer redis;
    
      static {
        redis = new GenericContainer("redis:6-alpine")
            .withExposedPorts(6379);
        redis.start();
        System.setProperty("spring.redis.host", redis.getContainerIpAddress());
        System.setProperty("spring.redis.port", redis.getFirstMappedPort() + "");
      }
    }
    
  4. create the test class CityRepositoryTest in the package repository

    import static org.junit.jupiter.api.Assertions.assertEquals;
    
    import java.util.List;
    import org.inmods.routeplanner.model.City;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.geo.Distance;
    import org.springframework.data.geo.Metrics;
    import org.springframework.data.geo.Point;
    
    @SpringBootTest
    class CityRepositoryTest extends RedisContainer {
      @Autowired
      CityRepository cityRepository;
    
      @Test
      void findByName() {
        List<City> cities = cityRepository.findByName("Basel");
        assertEquals("Basel", cities.get(0).getName());
      }
    
      @Test
      void findByLocationNear() {
        Distance dist = new Distance(10.0, Metrics.KILOMETERS);
        List<City> cities = cityRepository.findByLocationNear(new Point( 7.5, 47.5), dist);
        assertEquals("Basel", cities.get(0).getName());
      }
    }
    
  5. create the test class LinkRepositoryTest in the package repository

    package org.inmods.routeplanner.repository;
    
    import static org.junit.jupiter.api.Assertions.assertEquals;
    
    import org.inmods.routeplanner.model.Link;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class LinkRepositoryTest extends RedisContainer {
    
      @Autowired
      LinkRepository linkRepository;
    
      @Test
      public void findAll(){
        int num=0;
        for(Link l: linkRepository.findAll()){
          assertEquals("Basel", l.getFrom());
          assertEquals("Liestal", l.getTo());
          num++;
        }
        assertEquals(1, num);
      }
    }
    
  6. create the directory test/resources and mark it as "Test Resources Root"
  7. add the file cities.txt with following content to this resource directory (Note: tab instead of spaces):

    Basel Switzerland  162816 47.57  7.58
    Liestal  Switzerland  12831  47.49  7.73
    
  8. add the file links.txt with following content to this resource directory:

    Basel Liestal
    
  9. create the test class RouteServiceTest

    package org.inmods.routeplanner.service;
    
    import static org.junit.jupiter.api.Assertions.assertEquals;
    import static org.mockito.ArgumentMatchers.any;
    import static org.mockito.Mockito.when;
    
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import org.inmods.routeplanner.model.City;
    import org.inmods.routeplanner.model.Link;
    import org.inmods.routeplanner.model.Link.TransportMode;
    import org.inmods.routeplanner.repository.CityRepository;
    import org.inmods.routeplanner.repository.LinkRepository;
    import org.junit.jupiter.api.Test;
    import org.mockito.InjectMocks;
    import org.mockito.Mock;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.geo.Distance;
    import org.springframework.data.geo.Point;
    
    @SpringBootTest
    class RouteServiceTest {
      @InjectMocks
      RouteService routeService;
    
      @Mock
      CityRepository cityRepository;
      @Mock
      LinkRepository linkRepository;
    
      /** set up a dummy list of 3 cities "from", "between", "to"
       *
       * @return list of 3 cities
       */
      private List <City> createCities(){
        City from = new City();
        from.setName("from");
        from.setId("from");
        from.setLocation(new Point(0,0));
        City between = new City();
        between.setName("between");
        between.setId("between");
        between.setLocation(new Point(0,1));
        City to = new City();
        to.setName("to");
        to.setId("to");
        to.setLocation(new Point(1,1));
        return Arrays.asList(from, between, to);
      }
    
      @Test
      public void findShortestPath(){
        List<City> cities = createCities();
        // add to links from - between and between - to
        Link frombetween = new Link(cities.get(0).getName(), cities.get(1).getName(), 1.0, TransportMode.TRAIN);
        Link betweenTo = new Link(cities.get(1).getName(), cities.get(2).getName(), 1.0, TransportMode.TRAIN);
    
        for(City c: cities) {
          when(cityRepository.findByName(c.getName())).thenReturn(Collections.singletonList(c));
        }
        when(cityRepository.findByLocationNear(any(Point.class), any(Distance.class))).thenReturn(
            cities);
        when(linkRepository.findAll()).thenReturn(Arrays.asList(frombetween, betweenTo));
    
        List<Link> itinerary = routeService.findShortestPath(cities.get(0).getName(), cities.get(2).getName());
    
        assertEquals(2, itinerary.size());
        // should contain "from - between", "between - to"
        assertEquals(itinerary.get(0).getFrom(), cities.get(0).getName());
        assertEquals(itinerary.get(1).getFrom(), cities.get(1).getName());
      }
    
  10. Add the jacoco maven plugin to the build element in pom.xml

7.3. Order Processing

  1. create test class OrderRepositoryTest

    package org.inmods.orderprocessing.repository;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.List;
    import org.inmods.orderprocessing.model.Order;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.jdbc.Sql;
    import org.springframework.transaction.annotation.Transactional;
    
    @Transactional
    @SpringBootTest
    class OrderRepositoryTest {
    
      @Autowired
      OrderRepository orderRepository;
    
      @Test
      public void save(){
        Order order = new Order();
        order.setToCity("from");
        order.setToCity("to");
        orderRepository.save(order);
        assertNotNull(order.getId());
      }
    
      @Sql({"/test-orders.sql"})
      @Test
      public void findByFromCity(){
        List<Order> orders = orderRepository.findByFromCity("Basel");
        assertEquals(1,orders.size());
    
      }
    }
    
  2. create file test-orders.sql with following content in directory test/resources:

    insert into delivery_order(id, from_city, to_city) values (1, 'Basel', 'Aarau');
    insert into delivery_order(id, from_city, to_city) values (2, 'Aarau', 'Basel');
    
  3. add dependencies:

    <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>selenium</artifactId>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.seleniumhq.selenium</groupId>
      <artifactId>selenium-remote-driver</artifactId>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.seleniumhq.selenium</groupId>
      <artifactId>selenium-chrome-driver</artifactId>
      <scope>test</scope>
    </dependency>
    
  4. create test class OrderProcessingWebTest

    package org.inmods.orderprocessing;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import org.junit.jupiter.api.Test;
    import org.openqa.selenium.By;
    import org.openqa.selenium.WebDriver;
    import org.openqa.selenium.WebElement;
    import org.openqa.selenium.chrome.ChromeOptions;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
    import org.springframework.boot.web.server.LocalServerPort;
    import org.testcontainers.containers.BrowserWebDriverContainer;
    import org.testcontainers.junit.jupiter.Container;
    import org.testcontainers.junit.jupiter.Testcontainers;
    
    @Testcontainers
    @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
    class OrderProcessingWebTest {
      @LocalServerPort
      private int port;
    
      @Container
      private BrowserWebDriverContainer container =
          new BrowserWebDriverContainer()
              .withCapabilities(new ChromeOptions());
    
      @Value("${spring.security.user.name}")
      String username;
      @Value("${spring.security.user.password}")
      String password;
    
      private WebDriver getWebDriver(){
        return this.container.getWebDriver();
      }
    
      @Test
      public void testLogin() throws Exception {
        //String ipAddr = container.getTestHostIpAddress();
        getWebDriver().get("http://172.17.0.1:" + port);
        WebElement title = getWebDriver().findElement(
            By.xpath("//h2"));
        assertEquals("Please sign in", title.getText());
        getWebDriver().findElement(By.id("username")).sendKeys(username);
        getWebDriver().findElement(By.id("password")).sendKeys(password);
        getWebDriver().findElement(By.xpath("//button[@type='submit']")).click();
        getWebDriver().navigate().refresh();
        assertEquals("All Orders", getWebDriver().getTitle());
      }
    }
    

8. Operation

8.1. Running on localhost

Steps to put the system into operation on localhost:

  1. Invoke route-planner service

    cd route-planner
    docker run --rm -p 6379:6379 -d redis:6-alpine
    java -Dserver.port=8181 -jar target/route-planner-0.0.1-SNAPSHOT.jar
    
  2. invoke event-streaming service

    cd event-streaming
    docker compose up
    
  3. add event-streamer entry to fleet-manager/config-localhost.yaml and change route-planner port:

    route_planner:
      host: 'localhost'
      port: 8181
    event_streamer:
      host: 'localhost'
      port: 9092
    vehicles:
     - city: Basel
       speed: 20
     - city: Aarau
       speed: 16
    
  4. invoke fleet-manager service source inmods-venv/bin/activate cd fleet-manager flask –app manager run #+endexample
  5. invoke order-processing service

    cd order-processing
    java -Dserver.port=8080-jar target/order-processing-0.0.1-SNAPSHOT.jar
    
  6. invoke delivery-tracking

    cd delivery-tracking
    flask --app panel -p 5001 run
    
  7. login in http://localhost:8080 and create a new order (ex. Basel - Freiburg)
  8. open delivery tracking front end: http://localhost:5001

8.2. Running with docker compose

For running the application the following docker images need to be created:

  • hub.semafor.ch/semafor/inmods/route-planner
  • hub.semafor.ch/semafor/inmods/order-processing
  • hub.semafor.ch/semafor/inmods/fleet-manager
  • hub.semafor.ch/semafor/inmods/delivery-tracking

The spring-boot based services can be created with maven directly (see https://www.baeldung.com/spring-boot-docker-images) without Dockerfiles. For the python based services a Dockerfile must be created.

  1. Add the file Dockerfile to the directory fleet-manager:

    FROM python:3.12-slim AS build
    ENV DEBIAN_FRONTEND=noninteractive
    RUN apt-get update && apt-get -y --no-install-recommends install make
    RUN python -m venv /opt/venv
    
    WORKDIR /app
    COPY requirements.txt Makefile delivery_job.proto .
    
    ENV PATH="/opt/venv/bin:$PATH"
    RUN pip install -r requirements.txt && make
    # -------------------
    FROM python:3.12-slim
    COPY --from=build /opt/venv /opt/venv
    COPY --from=build /app /app
    ENV PATH="/opt/venv/bin:$PATH"
    ENV CONFIG_FILE="./etc/config.yaml"
    
    WORKDIR /app
    COPY server.py routeplanner.py manager.py vehicle.py .
    COPY etc etc
    
    EXPOSE 5000
    #
    RUN addgroup --system fleetmgr && adduser --system --group fleetmgr
    USER fleetmgr
    #
    CMD [ "python", "server.py"]
    
  2. Add the file Dockerfile to the directory delivery-tracking:

    FROM python:3.12-slim AS build
    RUN python -m venv /opt/venv
    
    WORKDIR /app
    COPY requirements.txt .
    ENV PATH="/opt/venv/bin:$PATH"
    RUN pip install -r requirements.txt
    # -------------------
    FROM python:3.12-slim
    COPY --from=build /opt/venv /opt/venv
    ENV PATH="/opt/venv/bin:$PATH"
    ENV FLASK_APP="panel"
    ENV CONFIG_FILE="./etc/config.ini"
    
    WORKDIR /app
    COPY panel panel
    COPY etc etc
    
    EXPOSE 5000
    #
    RUN addgroup --system flask && adduser --system --group flask
    USER flask
    #
    CMD [ "flask", "run", "--host=0.0.0.0" ]
    
  3. Create file docker-compose.yaml with following content:

    services:
      event-streamer:
        image: apache/kafka:4.0.0
        environment:
          - KAFKA_NODE_ID=1
          - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
          - KAFKA_PROCESS_ROLES=broker,controller
          - KAFKA_LISTENERS=PLAINTEXT://event-streamer:9092,CONTROLLER://:9093
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092
          - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
          - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
          - KAFKA_CONTROLLER_QUORUM_VOTERS=1@event-streamer:9093
    
      redis:
        image: "redis:7-alpine"
    
      route-planner:
        image: "hub.semafor.ch/semafor/inmods/route-planner:latest"
        environment:
          - SPRING_DATA_REDIS_HOST=redis
          - SPRING_DATA_REDIS_PORT=6379
        depends_on:
          - redis
    
      fleet-manager:
        image: "hub.semafor.ch/semafor/inmods/fleet-manager:latest"
        depends_on:
          - event-streamer
          - route-planner
        restart: on-failure
    
      order-processing:
        image: "hub.semafor.ch/semafor/inmods/order-processing:latest"
        environment:
          - SPRING_GRPC_CLIENT_CHANNELS_FLEET-MANAGER_ADDRESS=fleet-manager:5000
          - SERVER_PORT=8080
        ports:
          - "8080:8080"
    
      delivery-tracking:
        image: "hub.semafor.ch/semafor/inmods/delivery-tracking:latest"
        ports:
          - "5000:5000"
        depends_on:
          - event-streamer
        restart: on-failure
    
  4. invoke docker compose up -d
  5. check logs and status:

    docker compose logs
    
    docker compose ps
    
  6. enter order: http://localhost:8080
  7. visit tracking: http://localhost:5000

Author: Ronald Tanner

Created: 2025-09-12 Fri 16:08

Validate