123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- #Copyright 2017 Yann Weber
- #
- #This file is part of SndPipe.
- #
- #SndPipe is free software: you can redistribute it and/or modify
- #it under the terms of the GNU General Public License as published by
- #the Free Software Foundation, either version 3 of the License, or
- #any later version.
- #
- #SndPipe is distributed in the hope that it will be useful,
- #but WITHOUT ANY WARRANTY; without even the implied warranty of
- #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- #GNU General Public License for more details.
- #
- #You should have received a copy of the GNU General Public License
- #along with SndPipe. If not, see <http://www.gnu.org/licenses/>.
-
-
- import math
- import sys
- import struct
-
- BITMAX = lambda bsz: (1<<bsz)-1
-
- def sinwave_float32(step=0.01, endianess='little'):
- i = 0.0
- if endianess is None:
- e = ''
- elif endianess == 'little':
- e = '<'
- else:
- e = '>'
- while True:
- i += step
- i %= (math.pi*2)
- yield struct.pack('%cf'%e, math.sin(i))
-
- def sinwave(step=0.01, bitsz=8, signed=False):
- i = 0.0
- while True:
- i += step
- i %= (math.pi*2)
- if signed:
- yield int(math.sin(i) * (BITMAX(bitsz)/2))
- else:
- yield int((math.sin(i)+1)/2 * BITMAX(bitsz))
-
- def mixer(*waves):
- channels = []
- for wave in waves:
- if isinstance(wave, tuple):
- wave, vol = wave
- else:
- wave, vol = wave, 1.0
- channels.append((wave, vol/len(waves)))
- while True:
- yield int(sum([next(wave)*vol for wave, vol in channels]))
-
- def filter(wave, freq=10, high=True, glitch=False, bitsz=8):
- hist = [next(wave)]
-
- while True:
- c = next(wave)
- r = int(sum(hist)/len(hist))
- if high:
- if glitch:
- r = (c - r) % BITMAX(bitsz)
- elif c < r:
- r = 0
- else:
- r = c - r
- hist = [c] + (hist if len(hist) < freq else hist[:-1])
- yield r
-
- def lowpass(wave, freq=500):
- return filter(wave, freq, False)
-
- def highpass(wave, freq=500):
- return filter(wave, freq, True)
-
-
- def vol_lfo(wave, wave_arg, bitsz=8):
- while True:
- yield int(next(wave) * next(wave_arg) / BITMAX(bitsz))
-
-
- def output_sampler(*args, channels=2, samples=4096, freq=48000, bitsz=8,
- endianess='little', signed=False):
- if channels < 1:
- raise ValueError("channels must be > 1 but %d given" % channels)
- if len(args) == 0:
- raise ValueError("You must give at least one generator")
- if len(args) > 1 and channels != len(args):
- msg = "If more than one generator given, the count must match the \
- channel argument. But %d generator given and channels is %d"
- raise ValueError(msg % (len(args), channels))
-
- if len(args) == 1:
- if channels == 1:
- nxt_sample = lambda waves: [next(waves[0])]
- else:
- nxt_sample = lambda waves: [next(waves[0])]*2
- else:
- nxt_sample = lambda waves: map(next, waves)
-
- if bitsz != 8 or signed:
- nxt_arg = lambda: nxt_sample(args)
- gen = _output_sampler_multibits(nxt_arg, channels, samples, freq, bitsz,
- endianess, signed)
- while True:
- yield next(gen)
- raise StopIteration()
- return
-
- buff = bytearray([0 for _ in range(samples*channels)])
- while True:
- for i in range(0,samples*channels, channels):
- buff[i:i+channels] = nxt_sample(args)
- yield buff
-
- def _output_sampler_multibits(nxt_sample, channels, samples, freq, bitsz,
- endianess, signed):
- if bitsz not in (8,16,32):
- raise ValueError("Bitsize must be in (8,16,32) but %d given" % bitsz)
- Bsz = bitsz//8
- buff = bytearray([0 for _ in range(samples * channels * Bsz)])
- while True:
- for i in range(0, samples*channels*Bsz, channels*Bsz):
- nxt = nxt_sample()
- for j in range(channels):
- st = i + (j*Bsz)
- buff[st:st+Bsz] = nxt[j].to_bytes(Bsz, endianess,
- signed=signed)
- yield buff
|