Close

Python Code

A project log for A Cheap 24-bit Differential ADC for Raspberry Pi

Making the HX711 into a more general purpose differential data acquisition module with an interface to Raspberry Pi.

bud-bennettBud Bennett 07/12/2017 at 17:300 Comments

I tried to get the the HX711 routine from the pigpio library working, but it kept stopping after a couple of minutes. This is what I ended up with. Occasionally, the data gets corrupted when the linux kernel interferes with the data transfer so it has to be processed to avoid big spikes in the data stream.

This code works at the physical layer -- bit banging the data from the HX711. It must be called after the DOUT line transitions to a low state, indicating that the HX711 has finished a conversion.

myHX711.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-

import RPi.GPIO as GPIO
import time
import math

GPIO.setmode(GPIO.BCM)

class HX711:
    """ Configured as just a 24-bit differential ADC """

    def __init__(self, dout, pd_sck, gain=32):
        self.PD_SCK = pd_sck
        self.DOUT = dout
        GPIO.setup(self.PD_SCK, GPIO.OUT)
        GPIO.setup(self.DOUT, GPIO.IN)

        if gain is 128:
            self.GAIN = 1
        elif gain is 64:
            self.GAIN = 3
        elif gain is 32:
            self.GAIN = 2

		#initialize
        GPIO.output(self.PD_SCK, False)
        time.sleep(0.4)  # settle time
        GPIO.wait_for_edge(self.DOUT, GPIO.FALLING)
        self.read()

    def read(self):
        """ Clock in the data.
            PD_SCK must be low to keep the HX711 in an active state.
            The data from DOUT is clocked in on the falling edge of PD_SCK.
            Min PD_SCK high time = 0.2us, Max PD_SCK high time = 50us.
            Min PD_SCK low time = 0.2us. """
        
        databits = []
        for j in range(2, -1, -1):
            xbyte = 0x00  # intialize data byte
            for i in range(7, -1, -1):
                GPIO.output(self.PD_SCK, True)
                xbyte = xbyte | GPIO.input(self.DOUT) << i
                GPIO.output(self.PD_SCK, False)
                
            databits.append(xbyte)

        #set channel and gain factor for next reading
        for i in range(self.GAIN):
            GPIO.output(self.PD_SCK, True)
            GPIO.output(self.PD_SCK, False)
        
        # convert to integer from 2's complement bytes
        result = (databits[0] << 16) + (databits[1] << 8) + databits[2]
        if result > 8388607: result -= 16777216
        
        return result

    def meanstdv(self, x):
        """
        Calculate mean and standard deviation of data x[]:
        mean = {\sum_i x_i \over n}
        std = math.sqrt(\sum_i (x_i - mean)^2 \over n-1)
        """
        n = len(x)
        mean, std = 0, 0
        for a in x:
            mean +=  a
        mean = mean / float(n)
        for a in x:
            std += (a - mean)**2
        if(n > 1):
            std = math.sqrt(std / float(n-1))
        else:
            std = 0.0
        return mean, std


# test code

if __name__ == '__main__':
    DOUT = 25
    PD_SCK = 24
    gain = 128
    print("Gain = {}".format(gain))
    hx = HX711(DOUT, PD_SCK, gain)

    while True:
        GPIO.wait_for_edge(DOUT, GPIO.FALLING)
        try:
            val = hx.read()
            print("value = {0} ".format(val))
            
        except KeyboardInterrupt:
            GPIO.cleanup()

    GPIO.cleanup()

An this is an example of how to use it, along with the pulse swallowing code.

#!/usr/bin/env python3
#-*- coding:utf-8 -*-

'''
An example of getting data from the HX711 and swallowing artifacts. No guarantees that
this particular code block will work, since I butchered it pretty heavily. There are
additional processes (not shown) that processes the data into a seedlink stream and
web server -- pretty esoteric -- I didn't think that there would be much interest. 
But the general method should be pretty clear.
'''

from myHX711 import HX711
import time
from datetime import datetime, timedelta, date
from multiprocessing import Process, Manager
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)

# instantiate hx711
gain = 128  # allowed values: 128, 64, 32
print("gain = {}".format(gain))
DOUT = 25
PD_SCK = 24
RATE = 23
GPIO.setup(DOUT, GPIO.IN)
GPIO.setup(RATE, GPIO.OUT)
# set rate high (sample_rate = 28.94 Hz with 4.00 MHz xtal)
GPIO.output(RATE, 1)
hx = HX711(dout=DOUT, pd_sck=PD_SCK, gain=gain)
# the Manager provides communication between processes. In this case between getData and the others.
mgr = Manager()
# makes a global list variable that is compatible with multiprocessing module.
resultArray = mgr.list()

def getData():
    global resultArray, DOUT
    while True:
        GPIO.wait_for_edge(DOUT, GPIO.FALLING,bouncetime=1)
        tn = time.time() * 1000  # creates a unix timestamp with millisecond resolution
        Count = hx.read() # just returns the ADC result
        resultArray = signalProcessor(tn, Count, resultArray)
        if (len(resultArray) > int(sample_rate * 3600/2)):
            # store 1/2 hour of data
            # must use pop method for manager.list object instead of list=list[x:] method
            resultArray.pop(0)
    

def signalProcessor(tn, data, rArray):
    '''
    Pulse swallower:
    Swallow data, by using previous datapoint, if difference between Count(n-1) and
    Count(n) is greater than X, OR if the data = -1 (an indication that the data
    returned by the HX711 got clobbered.)
    Sample rate checker:
    Checks if period between samples is larger than Y. Kernal housekeeping obliterated
    an entire sample. This never happens at slower sample rates (< 30Hz).
    '''

    if (len(rArray) > 2):
        prevData = rArray[-1][1]
        prevTime = rArray[-1][0]
        #if ( 1/20 < (tn - prevTime)/1000 or (tn - prevTime)/1000 < 1/35 ):
        if ( 1/20 < (tn - prevTime)/1000):
            print('Bad Sample Rate {0}. Index={1}'.format(1000/(tn - prevTime), len(seed)))
            for n in range(0, 2):
                rArray.append([tn, data])
            return rArray
        if (((prevData - data)^2 > 3000^2 and prevData != rArray[-2][1]) or data == -1):
            rArray.append([tn, prevData])
            print("{0} Swallowed Pulse {1}".format(datetime.now(),prevData - data))
        else:
            rArray.append([tn, data])
    else:
        rArray.append([tn, data])
    
    return rArray

def wiggle():
	'''Makes a 10 minute square wave.'''
    GPIO.setup(22, GPIO.OUT)
    while True:
        print('LOW')
        GPIO.output(22, 0)
        time.sleep(60 * 5)
        print('HIGH')
        GPIO.output(22, 1)
        time.sleep(60 * 5)
        

run_wiggle = Process(target=wiggle)
run_wiggle.start()

run_getData = Process(target=getData)
run_getData.start()

GPIO.cleanup()
run_wiggle.join()
run_getData.join()

Discussions