bigqueue

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

README

BigQueue-go

BigQueue-go is pure Golang implementation for big, fast and persistent queue based on memory mapped file. Its file storage structures is totally compatible with [BigQueue](https://github.com/bulldog2011/bigqueue)

Go Report Card Build Status codecov Releases Godoc LICENSE

Feature Highlight:

  1. Fast: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access.
  2. Big: the total size of the queue is only limited by the available disk space.
  3. Persistent: all data in the queue is persisted on disk, and is crash resistant.
  4. Reliable: OS will be responsible to presist the produced messages even your process crashes.
  5. Realtime: messages produced by producer threads will be immediately visible to consumer threads.
  6. Memory-efficient: automatic paging & swapping algorithm, only most-recently accessed data is kept in memory.
  7. Thread-safe: multiple threads can concurrently enqueue and dequeue without data corruption.
  8. Simple&Light : pure Golang implements without any 3rd-party library

Quick Start

Installing

To start using BigQueue-Go, install Go and run go get:

$ go get github.com/jhunters/bigqueue

Importing bigqueue

To use bigqueue as an file implements queue, import as:


import	"github.com/jhunters/bigqueue"

func main() {
	var queue = new(bigqueue.FileQueue)

	err := queue.Open(".", "testqueue", nil)

	if err != nil {
		fmt.Println(err)
	}
	defer queue.Close()
	
	data := []byte("hello jhunters")
	
	i, err := queue.Enqueue(data)
	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("Enqueued index=", i, string(data))
	}
	
	index, bb, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
	}
	
	fmt.Println("Dequeue data:", index, string(bb))
}

The Big Picture

design

design

Benmark test

$ go test -bench=. -benchtime=3s -run=^$
goos: linux
goarch: amd64
pkg: github.com/bigqueue
Benchmark_EnqueueOnly-8                  2319403              1479 ns/op
Benchmark_DequeueOnly-8                  4704715               743 ns/op
Benchmark_EnqueueDequeue-8               1536244              2303 ns/op
Benchmark_ParallelEnqueueDequeue-8       1254315              2760 ns/op
PASS
ok      github.com/bigqueue     40.028s

License

BigQueue-Go is Apache 2.0 licensed.

Documentation

Overview

package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.

Index

Constants

View Source
const (

	// data file size
	DefaultDataPageSize = 128 * 1024 * 1024

	DefaultIndexItemsPerPage = 17

	MaxInt64 = 0x7fffffffffffffff

	IndexFileName = "index"
	DataFileName  = "data"
	MetaFileName  = "meta_data"
	FrontFileName = "front_index"
)

Variables

View Source
var (
	ErrEnqueueDataNull = errors.New("Enqueue data can not be null")

	IndexOutOfBoundTH = errors.New("Index is valid which should between tail and head index")
)

These errors can be returned when opening or calling methods on a DB.

View Source
var DefaultOptions = &Options{
	DataPageSize:      DefaultDataPageSize,
	indexPageSize:     defaultIndexPageSize,
	IndexItemsPerPage: DefaultIndexItemsPerPage,
	itemsPerPage:      defaultItemsPerPage,
	GcLock:            false,
}

default options

Functions

func Assert

func Assert(condition bool, message string, v ...interface{})

_assert will panic with a given formatted message if the given condition is false.

func BytesToInt

func BytesToInt(b []byte) int64

byte to int64

func BytesToInt32

func BytesToInt32(b []byte) int32

bytes to int32

func GetFileName

func GetFileName(prefix string, suffix string, index int64) string

func GetFiles

func GetFiles(pathname string) (*list.List, error)

get all files from current directory. not include any sub directories

func IntToBytes

func IntToBytes(n int64) []byte

int64 to byte array

func Mod

func Mod(val int64, bits int) int64

func PathExists

func PathExists(path string) (bool, error)

func RemoveFiles

func RemoveFiles(pathname string) error

remove all files from current directory. not include any sub directories

Types

type DB

type DB struct {
	// If you want to read the entire database fast, you can set MmapFlag to
	// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
	MmapFlags int

	InitialMmapSize int
	// contains filtered or unexported fields
}

DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.

func (*DB) Close

func (db *DB) Close() error

Close releases all resources.

func (*DB) GoString

func (db *DB) GoString() string

GoString returns the Go string representation of the database.

func (*DB) Open

func (db *DB) Open(mode os.FileMode) error

func (*DB) Path

func (db *DB) Path() string

Path returns the path to currently open database file.

type DBFactory

type DBFactory struct {
	InitialMmapSize int
	// contains filtered or unexported fields
}

func (*DBFactory) Close

func (f *DBFactory) Close() error

type FileQueue

type FileQueue struct {
	// front index of the big queue,
	FrontIndex int64

	// head index of the array, this is the read write barrier.
	// readers can only read items before this index, and writes can write this index or after
	HeadIndex int64

	// tail index of the array,
	// readers can't read items before this tail
	TailIndex int64
	// contains filtered or unexported fields
}

func (*FileQueue) Close

func (q *FileQueue) Close() error

func (*FileQueue) Dequeue

func (q *FileQueue) Dequeue() (int64, []byte, error)

Retrieves and removes the front of a queue

func (*FileQueue) Enqueue

func (q *FileQueue) Enqueue(data []byte) (int64, error)

Adds an item at the queue and HeadIndex will increase

func (*FileQueue) EnqueueAsync

func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))

Adds an item at the queue and HeadIndex will increase Asynchouous mode will call back with fn function

func (*FileQueue) Gc

func (q *FileQueue) Gc() error

Delete all used data files to free disk space.

BigQueue will persist enqueued data in disk files, these data files will remain even after the data in them has been dequeued later, so your application is responsible to periodically call this method to delete all used data files and free disk space.

func (*FileQueue) IsEmpty

func (q *FileQueue) IsEmpty() bool

Determines whether a queue is empty

func (*FileQueue) Open

func (q *FileQueue) Open(dir string, queueName string, options *Options) error

Open the queue files

func (*FileQueue) Peek

func (q *FileQueue) Peek() (int64, []byte, error)

Retrieves the item at the front of a queue if item exist return with index id, item data

func (*FileQueue) Size

func (q *FileQueue) Size() int64

Total number of items available in the queue.

func (*FileQueue) Skip

func (q *FileQueue) Skip(count int64) error

type Options

type Options struct {

	// size in bytes of a data page
	DataPageSize int

	// if true enable write lock on gc action
	GcLock bool

	// the item count is  1 << IndexItemsPerPage
	IndexItemsPerPage int
	// contains filtered or unexported fields
}

type Queue

type Queue interface {
	Open(dir string, queueName string, options *Options) error

	// Determines whether a queue is empty
	// return ture if empty, false otherwise
	IsEmpty() bool

	// return avaiable queue size
	Size() int64

	// Append an item to the queue and return index no
	// if any error ocurres a non-nil error returned
	Enqueue(data []byte) (int64, error)

	EnqueueAsync(data []byte, fn func(int64, error))

	Dequeue() (int64, []byte, error)

	Peek() (int64, []byte, error)

	// To skip deqeue target number of items
	Skip(count int64) error

	Close() error

	// Delete all used data files to free disk space.
	Gc() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL