Analyze ALPR Data Using Spark

- - posted in study

ALPR (Automated License Plate Reader) data are often used in law enforcement, but it can also be applied to analyze the vehicles users’ travel behaviors. Here we use spark to extract trip records from ALPR data.

1. Preliminary

ALPR data contains vehicle ID, timestamp and transect ID which indicates the place where the vehicle was detected. As shown in Figure 1, the time gap between two records could be used to discern whether two records are involved in the same trip. To avoid splitting one trip into two, here we chose a relatively big threshold. If the time gap between two consecutive ALPR records is bigger than 60 min, they are considered from two trips.

Figure 1 Distribution of time gap between consecutive records
Figure 1 Distribution of time gap between consecutive records

2. Code

First, create two classes named LicenseRecord and TripRecord. One instance of LicenseRecord consists of one’s ID and all his license records, and instances of TripRecord are our outputs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class LicenseRecord(vID : String, lData : Map[Long,String]) extends Serializable{
  val vehicleID : String = vID
  val licenseData : Map[Long,String] = lData

  def this(vehicleID : String, time : Long, transectId : String) = this(vehicleID,Map(time -> transectId))

  def +(dataWithSamevehicleID : LicenseRecord) =
    new LicenseRecord(this.vehicleID,this.licenseData ++ dataWithSamevehicleID.licenseData)
}

case class TripRecord(vID: String, stTime: Long, edTime: Long,lData : Map[Long,String]) extends Serializable{
  val vehicleID : String = vID
  val startTime: Long = stTime
  val endTime: Long = edTime
  val licenseData : Map[Long,String] = lData
  val size = lData.size

  def this(vehicleID : String, time: Long, transectId : String) = this(vehicleID,time,time,Map(time -> transectId))

  def +(dataWithSamevehicleID : TripRecord) =
    new TripRecord(this.vehicleID,this.startTime,dataWithSamevehicleID.endTime,this.licenseData ++ dataWithSamevehicleID.licenseData)
}

Then realize a function in LicenseRecord that deals individual’s license records.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def genTrip(minTime : Long, minPts : Int) : List[TripRecord] = {
    var oldTime : Long= -minTime
    var tripListBuffer = ListBuffer[TripRecord]()
    var tempTrip = new TripRecord(vID,oldTime,"")
    for((newTime,newTransect) <- ListMap(licenseData.toSeq.sortWith(_._1 < _._1):_*)){
      if(newTime-oldTime<minTime){
        tempTrip = tempTrip + new TripRecord(vID,newTime,newTransect)
      }
      else{
        if(tempTrip.isLegal(minPts)){
          tripListBuffer += tempTrip
        }
        tempTrip = new TripRecord(vID,newTime,newTransect)
      }
      oldTime = newTime
    }
    tripListBuffer.toList
  }

Finally, using spark to fulfill parallel processing.

1
2
3
4
5
6
7
8
9
10
11
//Read data
val plateData = sqlContext.read.parquet(loadFilePath)

//Preprocess data
val plateRDD = plateData.map(
row => new LicensePlateData(row.getString(0),row.getTimestamp(1).getTime,row.getString(2))).map(
  l => (l.licenseId,l)
).reduceByKey(_+_)

//Generate Trip
val tripRDD = plateRDD.flatMapValues(licensePlateData => licensePlateData.genTrip(minTime,minPts))

3. Result

Figure 2 shows the trip recognition result of one individual. Notice that some regular patterns appear in his trip records. And by defining a new function to compare the difference between trip records. One can use a process similar to the above to tell different kinds of trips and understand more things about human travel behaviors.

Figure 2 Trip recognition result of Su2
Figure 2 Trip recognition result of Su2